Skip to content

Commit

Permalink
Be able to create bank snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge authored and sambley committed Apr 17, 2019
1 parent 7ec198b commit 2333686
Show file tree
Hide file tree
Showing 29 changed files with 3,019 additions and 54 deletions.
156 changes: 154 additions & 2 deletions core/src/bank_forks.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
//! The `bank_forks` module implments BankForks a DAG of checkpointed Banks
use hashbrown::{HashMap, HashSet};
use bincode::{deserialize_from, serialize_into};
use solana_metrics::counter::Counter;
use solana_runtime::bank::Bank;
use solana_sdk::timing;
use std::collections::{HashMap, HashSet};
use std::fs;
use std::fs::File;
use std::io::{BufReader, BufWriter, Error, ErrorKind};
use std::ops::Index;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;

Expand Down Expand Up @@ -119,14 +124,72 @@ impl BankForks {
self.banks
.retain(|slot, bank| *slot >= root || bank.is_in_subtree_of(root))
}

fn get_io_error(error: &str) -> Error {
Error::new(ErrorKind::Other, error)
}

pub fn save_snapshot(&self, path: &str) -> Result<(), Error> {
let mut bank_index: HashMap<u64, PathBuf> = HashMap::new();
let path = Path::new(&path);
fs::create_dir_all(path)?;
for (slot, bank) in &self.frozen_banks() {
let bank_file = format!("bank-{}.snapshot", slot);
let bank_file_path = path.join(bank_file);
bank_index.insert(*slot, bank_file_path.clone());
let file = File::create(bank_file_path)?;
let mut stream = BufWriter::new(file);
serialize_into(&mut stream, &bank)
.map_err(|_| BankForks::get_io_error("serialize bank error"))?;
}
let index_path = Path::new(&path).join("bank.index");
fs::create_dir_all(path)?;
{
let file = File::create(index_path)?;
let mut stream = BufWriter::new(file);
serialize_into(&mut stream, &bank_index)
.map_err(|_| BankForks::get_io_error("serialize bank error"))?;
}
Ok(())
}

pub fn load_from_snapshot(path: &str) -> Result<Self, Error> {
let bank_index = {
let path = Path::new(path).join("bank.index");
let index_file = File::open(path)?;
let mut stream = BufReader::new(index_file);
let index: HashMap<u64, PathBuf> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize snapshot index error"))?;
index
};

let mut banks: HashMap<u64, Arc<Bank>> = HashMap::new();
for (slot, bank_path) in &bank_index {
let file = File::open(bank_path)?;
let mut stream = BufReader::new(file);
let bank: Bank = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize bank error"))?;
banks.insert(*slot, Arc::new(bank));
}
let working_bank = banks[&0].clone();
Ok(BankForks {
banks,
working_bank,
})
}
}

#[cfg(test)]
mod tests {
use super::*;
use bincode::{deserialize, serialize};
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use solana_sdk::system_transaction;
use std::env;
use std::fs::remove_dir_all;

#[test]
fn test_bank_forks() {
Expand All @@ -152,7 +215,7 @@ mod tests {
bank_forks.insert(bank);
let descendants = bank_forks.descendants();
let children: Vec<u64> = descendants[&0].iter().cloned().collect();
assert_eq!(children, vec![1, 2]);
assert_eq!(children, vec![2, 1]);
assert!(descendants[&1].is_empty());
assert!(descendants[&2].is_empty());
}
Expand Down Expand Up @@ -196,4 +259,93 @@ mod tests {
assert_eq!(bank_forks.active_banks(), vec![1]);
}

struct TempPaths {
pub paths: String,
}

#[macro_export]
macro_rules! tmp_bank_accounts_name {
() => {
&format!("{}-{}", file!(), line!())
};
}

#[macro_export]
macro_rules! get_tmp_bank_accounts_path {
() => {
get_tmp_bank_accounts_path(tmp_bank_accounts_name!())
};
}

impl Drop for TempPaths {
fn drop(&mut self) {
let paths: Vec<String> = self.paths.split(',').map(|s| s.to_string()).collect();
paths.iter().for_each(|p| {
let _ignored = remove_dir_all(p);
});
}
}

fn get_paths_vec(paths: &str) -> Vec<String> {
paths.split(',').map(|s| s.to_string()).collect()
}

fn get_tmp_bank_accounts_path(paths: &str) -> TempPaths {
let vpaths = get_paths_vec(paths);
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let vpaths: Vec<_> = vpaths
.iter()
.map(|path| format!("{}/{}", out_dir, path))
.collect();
TempPaths {
paths: vpaths.join(","),
}
}

fn save_and_load_snapshot(bank_forks: &BankForks) {
let bank = bank_forks.banks.get(&0).unwrap();
let tick_height = bank.tick_height();
let bank_ser = serialize(&bank).unwrap();
let child_bank = bank_forks.banks.get(&1).unwrap();
let child_bank_ser = serialize(&child_bank).unwrap();
let snapshot_path = "snapshots";
bank_forks.save_snapshot(snapshot_path).unwrap();
drop(bank_forks);

let new = BankForks::load_from_snapshot(snapshot_path).unwrap();
assert_eq!(new[0].tick_height(), tick_height);
let bank: Bank = deserialize(&bank_ser).unwrap();
let new_bank = new.banks.get(&0).unwrap();
bank.compare_bank(&new_bank);
let child_bank: Bank = deserialize(&child_bank_ser).unwrap();
let new_bank = new.banks.get(&1).unwrap();
child_bank.compare_bank(&new_bank);
drop(new);
let _ = fs::remove_dir_all(snapshot_path);
}

#[test]
fn test_bank_forks_snapshot_n() {
solana_logger::setup();
let path = get_tmp_bank_accounts_path!();
let (genesis_block, mint_keypair) = GenesisBlock::new(10_000);
let bank0 = Bank::new_with_paths(&genesis_block, Some(path.paths.clone()));
bank0.freeze();
let mut bank_forks = BankForks::new(0, bank0);
for index in 0..10 {
let bank = Bank::new_from_parent(&bank_forks[index], &Pubkey::default(), index + 1);
let key1 = Keypair::new().pubkey();
let tx = system_transaction::create_user_account(
&mint_keypair,
&key1,
1,
genesis_block.hash(),
0,
);
assert_eq!(bank.process_transaction(&tx), Ok(()));
bank.freeze();
bank_forks.insert(bank);
save_and_load_snapshot(&bank_forks);
}
}
}
2 changes: 1 addition & 1 deletion core/src/blocktree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use solana_kvstore as kvstore;

use bincode::deserialize;

use hashbrown::HashMap;
use std::collections::HashMap;

#[cfg(not(feature = "kvstore"))]
use rocksdb;
Expand Down
2 changes: 1 addition & 1 deletion core/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use crate::staking_utils;
use crate::streamer::{BlobReceiver, BlobSender};
use bincode::{deserialize, serialize};
use core::cmp;
use hashbrown::HashMap;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use solana_metrics::counter::Counter;
Expand All @@ -41,6 +40,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature};
use solana_sdk::timing::{duration_as_ms, timestamp};
use solana_sdk::transaction::Transaction;
use std::cmp::min;
use std::collections::HashMap;
use std::fmt;
use std::io;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
Expand Down
2 changes: 1 addition & 1 deletion core/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_gossip_pull::CrdsGossipPull;
use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE};
use crate::crds_value::CrdsValue;
use hashbrown::HashMap;
use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;

///The min size for bloom filters
pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000;
Expand Down
2 changes: 1 addition & 1 deletion core/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_value::{CrdsValue, CrdsValueLabel};
use crate::packet::BLOB_DATA_SIZE;
use bincode::serialized_size;
use hashbrown::HashMap;
use rand;
use rand::distributions::{Distribution, WeightedIndex};
use solana_runtime::bloom::Bloom;
use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use std::cmp;
use std::collections::HashMap;
use std::collections::VecDeque;

pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000;
Expand Down
2 changes: 1 addition & 1 deletion core/src/crds_gossip_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use crate::crds_gossip_error::CrdsGossipError;
use crate::crds_value::{CrdsValue, CrdsValueLabel};
use crate::packet::BLOB_DATA_SIZE;
use bincode::serialized_size;
use hashbrown::HashMap;
use indexmap::map::IndexMap;
use rand;
use rand::distributions::{Distribution, WeightedIndex};
Expand All @@ -25,6 +24,7 @@ use solana_sdk::hash::Hash;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use std::cmp;
use std::collections::HashMap;

pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30;
pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6;
Expand Down
17 changes: 14 additions & 3 deletions core/src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct FullnodeConfig {
pub tick_config: PohServiceConfig,
pub account_paths: Option<String>,
pub rpc_config: JsonRpcConfig,
pub use_snapshot: bool,
}
impl Default for FullnodeConfig {
fn default() -> Self {
Expand All @@ -60,6 +61,7 @@ impl Default for FullnodeConfig {
tick_config: PohServiceConfig::default(),
account_paths: None,
rpc_config: JsonRpcConfig::default(),
use_snapshot: false,
}
}
}
Expand Down Expand Up @@ -95,7 +97,11 @@ impl Fullnode {
assert_eq!(id, node.info.id);

let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) =
new_banks_from_blocktree(ledger_path, config.account_paths.clone());
new_banks_from_blocktree(
ledger_path,
config.account_paths.clone(),
config.use_snapshot,
);

let exit = Arc::new(AtomicBool::new(false));
let bank_info = &bank_forks_info[0];
Expand Down Expand Up @@ -275,16 +281,21 @@ impl Fullnode {
pub fn new_banks_from_blocktree(
blocktree_path: &str,
account_paths: Option<String>,
use_snapshot: bool,
) -> (BankForks, Vec<BankForksInfo>, Blocktree, Receiver<bool>) {
let genesis_block =
GenesisBlock::load(blocktree_path).expect("Expected to successfully open genesis block");

let (blocktree, ledger_signal_receiver) = Blocktree::open_with_signal(blocktree_path)
.expect("Expected to successfully open database ledger");

let (bank_forks, bank_forks_info) =
let (bank_forks, bank_forks_info) = if use_snapshot {
let bank_forks = BankForks::load_from_snapshot("bank-snapshot").unwrap();
(bank_forks, vec![])
} else {
blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths)
.expect("process_blocktree failed");
.expect("process_blocktree failed")
};

(
bank_forks,
Expand Down
2 changes: 1 addition & 1 deletion core/src/locktower.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::bank_forks::BankForks;
use crate::staking_utils;
use hashbrown::{HashMap, HashSet};
use solana_metrics::influxdb;
use solana_runtime::bank::Bank;
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_vote_api::vote_state::{Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

pub const VOTE_THRESHOLD_DEPTH: usize = 8;
Expand Down
4 changes: 2 additions & 2 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::poh_recorder::PohRecorder;
use crate::result::{Error, Result};
use crate::rpc_subscriptions::RpcSubscriptions;
use crate::service::Service;
use hashbrown::HashMap;
use solana_metrics::counter::Counter;
use solana_metrics::influxdb;
use solana_runtime::bank::Bank;
Expand All @@ -22,6 +21,7 @@ use solana_sdk::signature::KeypairUtil;
use solana_sdk::timing::{self, duration_as_ms};
use solana_sdk::transaction::Transaction;
use solana_vote_api::vote_instruction;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex, RwLock};
Expand Down Expand Up @@ -637,7 +637,7 @@ mod test {
{
let voting_keypair = Arc::new(Keypair::new());
let (bank_forks, _bank_forks_info, blocktree, l_receiver) =
new_banks_from_blocktree(&my_ledger_path, None);
new_banks_from_blocktree(&my_ledger_path, None, false);
let bank = bank_forks.working_bank();

let blocktree = Arc::new(blocktree);
Expand Down
6 changes: 3 additions & 3 deletions core/src/staking_utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use hashbrown::HashMap;
use solana_runtime::bank::Bank;
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use solana_vote_api::vote_state::VoteState;
use std::borrow::Borrow;
use std::collections::HashMap;

/// Looks through vote accounts, and finds the latest slot that has achieved
/// supermajority lockout
Expand Down Expand Up @@ -66,7 +66,7 @@ pub fn node_staked_accounts_at_epoch(
) -> Option<impl Iterator<Item = (&Pubkey, u64, &Account)>> {
bank.epoch_vote_accounts(epoch_height).map(|epoch_state| {
epoch_state
.into_iter()
.iter()
.filter_map(|(account_id, account)| {
filter_zero_balances(account).map(|stake| (account_id, stake, account))
})
Expand Down Expand Up @@ -149,10 +149,10 @@ where
mod tests {
use super::*;
use crate::voting_keypair::tests as voting_keypair_tests;
use hashbrown::HashSet;
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::{Keypair, KeypairUtil};
use std::collections::HashSet;
use std::iter::FromIterator;
use std::sync::Arc;

Expand Down
2 changes: 1 addition & 1 deletion core/tests/cluster_info.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use hashbrown::{HashMap, HashSet};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon::prelude::*;
use solana::cluster_info::{
Expand All @@ -7,6 +6,7 @@ use solana::cluster_info::{
};
use solana::contact_info::ContactInfo;
use solana_sdk::pubkey::Pubkey;
use std::collections::{HashMap, HashSet};
use std::sync::mpsc::channel;
use std::sync::mpsc::TryRecvError;
use std::sync::mpsc::{Receiver, Sender};
Expand Down
Loading

0 comments on commit 2333686

Please sign in to comment.