Skip to content

Commit

Permalink
Save/restore Tower
Browse files Browse the repository at this point in the history
  • Loading branch information
mvines committed May 6, 2020
1 parent 445e666 commit 3ede750
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 39 deletions.
34 changes: 34 additions & 0 deletions core/benches/consensus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#![feature(test)]

extern crate solana_core;
extern crate test;

use solana_core::consensus::Tower;
use solana_ledger::bank_forks::BankForks;
use solana_runtime::bank::Bank;
use solana_sdk::{
pubkey::Pubkey,
signature::{Keypair, Signer},
};
use std::sync::Arc;
use tempfile::TempDir;
use test::Bencher;

#[bench]
fn bench_save_tower(bench: &mut Bencher) {
let dir = TempDir::new().unwrap();
let path = dir.path();

let vote_account_pubkey = &Pubkey::default();
let node_keypair = Arc::new(Keypair::new());
let tower = Tower::new(
&node_keypair.pubkey(),
&vote_account_pubkey,
&BankForks::new(0, Bank::default()),
&path,
);

bench.iter(move || {
tower.save(&node_keypair).unwrap();
});
}
158 changes: 151 additions & 7 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,25 @@ use solana_sdk::{
clock::{Slot, UnixTimestamp},
hash::Hash,
pubkey::Pubkey,
signature::{Keypair, Signature, Signer},
};
use solana_vote_program::vote_state::{
BlockTimestamp, Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY, TIMESTAMP_SLOT_INTERVAL,
};
use std::{
collections::{HashMap, HashSet},
fs::{self, File},
io::BufReader,
path::{Path, PathBuf},
sync::Arc,
};
use thiserror::Error;

pub const VOTE_THRESHOLD_DEPTH: usize = 8;
pub const VOTE_THRESHOLD_SIZE: f64 = 2f64 / 3f64;

pub type Result<T> = std::result::Result<T, TowerError>;

#[derive(Default, Debug, Clone)]
pub struct StakeLockout {
lockout: u64,
Expand All @@ -37,13 +44,16 @@ impl StakeLockout {
}
}

#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
pub struct Tower {
node_pubkey: Pubkey,
threshold_depth: usize,
threshold_size: f64,
lockouts: VoteState,
last_vote: Vote,
last_timestamp: BlockTimestamp,
#[serde(skip)]
save_path: PathBuf,
}

impl Default for Tower {
Expand All @@ -55,19 +65,28 @@ impl Default for Tower {
lockouts: VoteState::default(),
last_vote: Vote::default(),
last_timestamp: BlockTimestamp::default(),
save_path: PathBuf::default(),
}
}
}

impl Tower {
pub fn new(node_pubkey: &Pubkey, vote_account_pubkey: &Pubkey, bank_forks: &BankForks) -> Self {
let mut tower = Self::new_with_key(node_pubkey);

pub fn new(
node_pubkey: &Pubkey,
vote_account_pubkey: &Pubkey,
bank_forks: &BankForks,
save_path: &Path,
) -> Self {
let mut tower = Self {
node_pubkey: *node_pubkey,
save_path: PathBuf::from(save_path),
..Tower::default()
};
tower.initialize_lockouts_from_bank_forks(&bank_forks, vote_account_pubkey);

tower
}

#[cfg(test)]
pub fn new_with_key(node_pubkey: &Pubkey) -> Self {
Self {
node_pubkey: *node_pubkey,
Expand Down Expand Up @@ -273,6 +292,14 @@ impl Tower {
self.lockouts.root_slot
}

pub fn last_lockout_vote_slot(&self) -> Option<Slot> {
self.lockouts
.votes
.iter()
.max_by(|x, y| x.slot.cmp(&y.slot))
.map(|v| v.slot)
}

// a slot is not recent if it's older than the newest vote we have
pub fn is_recent(&self, slot: u64) -> bool {
if let Some(last_vote) = self.lockouts.votes.back() {
Expand Down Expand Up @@ -438,12 +465,12 @@ impl Tower {
vote_account_pubkey: &Pubkey,
) {
if let Some(bank) = self.find_heaviest_bank(bank_forks) {
let root = bank_forks.root();
if let Some((_stake, vote_account)) = bank.vote_accounts().get(vote_account_pubkey) {
let root_slot = bank_forks.root();
let mut vote_state = VoteState::deserialize(&vote_account.data)
.expect("vote_account isn't a VoteState?");
vote_state.root_slot = Some(root);
vote_state.votes.retain(|v| v.slot > root);
vote_state.root_slot = Some(root_slot);
vote_state.votes.retain(|v| v.slot > root_slot);
trace!(
"{} lockouts initialized to {:?}",
self.node_pubkey,
Expand All @@ -455,6 +482,12 @@ impl Tower {
"vote account's node_pubkey doesn't match",
);
self.lockouts = vote_state;
} else {
info!(
"vote account({}) not found in heaviest bank (slot={})",
vote_account_pubkey,
bank.slot()
);
}
}
}
Expand All @@ -473,6 +506,117 @@ impl Tower {
None
}
}

fn get_filename(path: &Path, node_pubkey: &Pubkey) -> PathBuf {
PathBuf::from(path)
.join(format!("tower-{}", node_pubkey))
.with_extension("bin")
}

pub fn save(&self, node_keypair: &Arc<Keypair>) -> Result<()> {
assert_eq!(node_keypair.pubkey(), self.node_pubkey);

if self.node_pubkey != node_keypair.pubkey() {
return Err(TowerError::WrongTower(format!(
"node_pubkey is {:?} but found tower for {:?}",
node_keypair.pubkey(),
self.node_pubkey
)));
}

fs::create_dir_all(&self.save_path)?;
let filename = Self::get_filename(&self.save_path, &self.node_pubkey);
let new_filename = filename.with_extension("new");
{
let mut file = File::create(&new_filename)?;
let saveable_tower = SavedTower::new(self, node_keypair)?;
bincode::serialize_into(&mut file, &saveable_tower)?;
}
fs::rename(&new_filename, &filename)?;
Ok(())
}

pub fn restore(save_path: &Path, node_pubkey: &Pubkey) -> Result<Self> {
let filename = Self::get_filename(save_path, node_pubkey);

info!("Restoring tower from {:?}", filename);
let file = File::open(filename)?;
let mut stream = BufReader::new(file);

let saved_tower: SavedTower = bincode::deserialize_from(&mut stream)?;
if !saved_tower.verify(node_pubkey) {
return Err(TowerError::InvalidSignature);
}
let mut tower = saved_tower.deserialize()?;
tower.save_path = save_path.to_path_buf();

// check that the tower actually belongs to this node
if &tower.node_pubkey != node_pubkey {
return Err(TowerError::WrongTower(format!(
"node_pubkey is {:?} but found tower for {:?}",
node_pubkey, tower.node_pubkey
)));
}

Ok(tower)
}
}

#[derive(Error, Debug)]
pub enum TowerError {
#[error("IO Error: {0}")]
IOError(#[from] std::io::Error),

#[error("Serialization Error: {0}")]
SerializeError(#[from] Box<bincode::ErrorKind>),

#[error("The signature on the saved tower is invalid")]
InvalidSignature,

#[error("The tower does not match this validator: {0}")]
WrongTower(String),
}

#[derive(Default, Clone, Serialize, Deserialize, Debug, PartialEq)]
pub struct SavedTower {
signature: Signature,
data: Vec<u8>,
}

impl SavedTower {
pub fn new<T: Signer>(tower: &Tower, keypair: &Arc<T>) -> Result<Self> {
let data = bincode::serialize(tower)?;
let signature = keypair.sign_message(&data);
Ok(Self { data, signature })
}

pub fn verify(&self, pubkey: &Pubkey) -> bool {
self.signature.verify(pubkey.as_ref(), &self.data)
}

pub fn deserialize(&self) -> Result<Tower> {
bincode::deserialize(&self.data).map_err(|e| e.into())
}
}

// Given an untimely crash, tower may have roots that are not reflected in blockstore because
// `ReplayState::handle_votable_bank()` saves tower before setting blockstore roots
pub fn reconcile_blockstore_roots_with_tower(
tower: &Tower,
blockstore: &solana_ledger::blockstore::Blockstore,
) -> solana_ledger::blockstore_db::Result<()> {
if let Some(tower_root) = tower.root() {
let last_blockstore_root = blockstore.last_root();
if last_blockstore_root < tower_root {
let new_roots: Vec<_> = blockstore
.slot_meta_iterator(last_blockstore_root + 1)?
.map(|(slot, _)| slot)
.take_while(|slot| *slot <= tower_root)
.collect();
blockstore.set_roots(&new_roots)?
}
}
Ok(())
}

#[cfg(test)]
Expand Down
14 changes: 11 additions & 3 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl ReplayStage {
cluster_info: Arc<ClusterInfo>,
ledger_signal_receiver: Receiver<bool>,
poh_recorder: Arc<Mutex<PohRecorder>>,
mut tower: Tower,
vote_tracker: Arc<VoteTracker>,
cluster_slots: Arc<ClusterSlots>,
retransmit_slots_sender: RetransmitSlotsSender,
Expand All @@ -138,7 +139,6 @@ impl ReplayStage {

let (root_bank_sender, root_bank_receiver) = channel();
trace!("replay stage");
let mut tower = Tower::new(&my_pubkey, &vote_account, &bank_forks.read().unwrap());

// Start the replay stage loop
let (lockouts_sender, commitment_service) =
Expand Down Expand Up @@ -773,7 +773,15 @@ impl ReplayStage {
}
trace!("handle votable bank {}", bank.slot());
let (vote, tower_index) = tower.new_vote_from_bank(bank, vote_account_pubkey);
if let Some(new_root) = tower.record_bank_vote(vote) {
let new_root = tower.record_bank_vote(vote);
let last_vote = tower.last_vote_and_timestamp();

if let Err(err) = tower.save(&cluster_info.keypair) {
error!("Unable to save tower: {:?}", err);
std::process::exit(1);
}

if let Some(new_root) = new_root {
// get the root bank before squash
let root_bank = bank_forks
.read()
Expand Down Expand Up @@ -831,7 +839,7 @@ impl ReplayStage {
bank,
vote_account_pubkey,
authorized_voter_keypairs,
tower.last_vote_and_timestamp(),
last_vote,
tower_index,
);
Ok(())
Expand Down
21 changes: 15 additions & 6 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
cluster_info_vote_listener::VoteTracker,
cluster_slots::ClusterSlots,
commitment::BlockCommitmentCache,
consensus::Tower,
ledger_cleanup_service::LedgerCleanupService,
poh_recorder::PohRecorder,
replay_stage::{ReplayStage, ReplayStageConfig},
Expand All @@ -21,19 +22,19 @@ use crate::{
storage_stage::{StorageStage, StorageState},
};
use crossbeam_channel::unbounded;
use solana_ledger::leader_schedule_cache::LeaderScheduleCache;
use solana_ledger::{
bank_forks::BankForks,
blockstore::{Blockstore, CompletedSlotsReceiver},
blockstore_processor::TransactionStatusSender,
leader_schedule_cache::LeaderScheduleCache,
snapshot_package::AccountsPackageSender,
};
use solana_sdk::{
pubkey::Pubkey,
signature::{Keypair, Signer},
};
use std::collections::HashSet;
use std::{
collections::HashSet,
net::UdpSocket,
sync::{
atomic::AtomicBool,
Expand Down Expand Up @@ -91,6 +92,7 @@ impl Tvu {
ledger_signal_receiver: Receiver<bool>,
subscriptions: &Arc<RpcSubscriptions>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
tower: Tower,
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>,
completed_slots_receiver: CompletedSlotsReceiver,
Expand Down Expand Up @@ -204,6 +206,7 @@ impl Tvu {
cluster_info.clone(),
ledger_signal_receiver,
poh_recorder.clone(),
tower,
vote_tracker,
cluster_slots,
retransmit_slots_sender,
Expand Down Expand Up @@ -263,11 +266,15 @@ impl Tvu {
#[cfg(test)]
pub mod tests {
use super::*;
use crate::banking_stage::create_test_recorder;
use crate::cluster_info::{ClusterInfo, Node};
use crate::{
banking_stage::create_test_recorder,
cluster_info::{ClusterInfo, Node},
};
use serial_test_derive::serial;
use solana_ledger::create_new_tmp_ledger;
use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
use solana_ledger::{
create_new_tmp_ledger,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
};
use solana_runtime::bank::Bank;
use std::sync::atomic::Ordering;

Expand Down Expand Up @@ -304,6 +311,7 @@ pub mod tests {
BlockCommitmentCache::default_with_blockstore(blockstore.clone()),
));
let (retransmit_slots_sender, _retransmit_slots_receiver) = unbounded();
let tower = Tower::new_with_key(&target1_keypair.pubkey());
let tvu = Tvu::new(
&vote_keypair.pubkey(),
vec![Arc::new(vote_keypair)],
Expand All @@ -323,6 +331,7 @@ pub mod tests {
l_receiver,
&Arc::new(RpcSubscriptions::new(&exit, block_commitment_cache.clone())),
&poh_recorder,
tower,
&leader_schedule_cache,
&exit,
completed_slots_receiver,
Expand Down
Loading

0 comments on commit 3ede750

Please sign in to comment.