Skip to content

Commit

Permalink
load snapshot on start
Browse files Browse the repository at this point in the history
  • Loading branch information
sambley committed Apr 24, 2019
1 parent 2f42036 commit 0fa5b6e
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 50 deletions.
122 changes: 83 additions & 39 deletions core/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use solana_metrics::counter::Counter;
use solana_runtime::bank::Bank;
use solana_sdk::timing;
use std::collections::{HashMap, HashSet};
use std::env;
use std::fs;
use std::fs::File;
use std::io::{BufReader, BufWriter, Error, ErrorKind};
Expand All @@ -16,6 +17,8 @@ use std::time::Instant;
pub struct BankForks {
banks: HashMap<u64, Arc<Bank>>,
working_bank: Arc<Bank>,
slots: HashSet<u64>,
use_snapshot: bool,
}

impl Index<u64> for BankForks {
Expand All @@ -33,6 +36,8 @@ impl BankForks {
Self {
banks,
working_bank,
slots: HashSet::new(),
use_snapshot: false,
}
}

Expand Down Expand Up @@ -91,6 +96,8 @@ impl BankForks {
Self {
banks,
working_bank,
slots: HashSet::new(),
use_snapshot: false,
}
}

Expand Down Expand Up @@ -122,60 +129,94 @@ impl BankForks {
}

fn prune_non_root(&mut self, root: u64) {
let slots: HashSet<u64> = self
.banks
.iter()
.filter(|(_, b)| b.is_frozen())
.map(|(k, _)| *k)
.collect();
self.banks
.retain(|slot, bank| *slot >= root || bank.is_in_subtree_of(root))
.retain(|slot, bank| *slot >= root || bank.is_in_subtree_of(root));
let diff: HashSet<_> = slots.symmetric_difference(&self.slots).collect();
for slot in diff.iter() {
if **slot > root {
let _ = self.add_snapshot(**slot);
} else {
self.remove_snapshot(**slot);
}
}
self.slots = slots.clone();
}

fn get_io_error(error: &str) -> Error {
Error::new(ErrorKind::Other, error)
}

pub fn save_snapshot(&self, path: &str) -> Result<(), Error> {
let mut bank_index: HashMap<u64, PathBuf> = HashMap::new();
let path = Path::new(&path);
fs::create_dir_all(path)?;
for (slot, bank) in &self.frozen_banks() {
let bank_file = format!("bank-{}.snapshot", slot);
let bank_file_path = path.join(bank_file);
bank_index.insert(*slot, bank_file_path.clone());
let file = File::create(bank_file_path)?;
let mut stream = BufWriter::new(file);
serialize_into(&mut stream, &bank)
.map_err(|_| BankForks::get_io_error("serialize bank error"))?;
}
let index_path = Path::new(&path).join("bank.index");
fs::create_dir_all(path)?;
{
let file = File::create(index_path)?;
let mut stream = BufWriter::new(file);
serialize_into(&mut stream, &bank_index)
.map_err(|_| BankForks::get_io_error("serialize bank error"))?;
}
fn get_snapshot_path() -> PathBuf {
let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string());
let snapshot_dir = format!("{}/snapshots/", out_dir);
Path::new(&snapshot_dir).to_path_buf()
}

pub fn add_snapshot(&self, slot: u64) -> Result<(), Error> {
let path = BankForks::get_snapshot_path();
fs::create_dir_all(path.clone())?;
let bank_file = format!("{}", slot);
let bank_file_path = path.join(bank_file);
let file = File::create(bank_file_path)?;
let mut stream = BufWriter::new(file);
serialize_into(&mut stream, self.get(slot).unwrap())
.map_err(|_| BankForks::get_io_error("serialize bank error"))?;
Ok(())
}

pub fn load_from_snapshot(path: &str) -> Result<Self, Error> {
let bank_index = {
let path = Path::new(path).join("bank.index");
let index_file = File::open(path)?;
let mut stream = BufReader::new(index_file);
let index: HashMap<u64, PathBuf> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize snapshot index error"))?;
index
};
pub fn remove_snapshot(&self, slot: u64) {
let path = BankForks::get_snapshot_path();
let bank_file = format!("{}", slot);
let bank_file_path = path.join(bank_file);
let _ = fs::remove_file(bank_file_path);
}

pub fn set_snapshot_config(&mut self, use_snapshot: bool) {
self.use_snapshot = use_snapshot;
}

pub fn load_from_snapshot() -> Result<Self, Error> {
let path = BankForks::get_snapshot_path();
let paths = fs::read_dir(path.clone())?;
let names = paths
.filter_map(|entry| {
entry.ok().and_then(|e| {
e.path()
.file_name()
.and_then(|n| n.to_str().map(String::from))
})
})
.collect::<Vec<String>>();

let mut banks: HashMap<u64, Arc<Bank>> = HashMap::new();
for (slot, bank_path) in &bank_index {
let file = File::open(bank_path)?;
let mut slots = HashSet::new();
let mut last_slot: u64 = 0;
for bank_path in names {
let bank_file_path = path.join(bank_path.clone());
info!("Load from {:?}", bank_file_path);
let file = File::open(bank_file_path)?;
let mut stream = BufReader::new(file);
let bank: Bank = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize bank error"))?;
banks.insert(*slot, Arc::new(bank));
let slot = bank_path.parse::<u64>().unwrap();
banks.insert(slot, Arc::new(bank));
slots.insert(slot);
if slot > last_slot {
last_slot = slot;
}
}
let working_bank = banks[&0].clone();
let working_bank = banks[&last_slot].clone();
Ok(BankForks {
banks,
working_bank,
slots,
use_snapshot: true,
})
}
}
Expand Down Expand Up @@ -309,20 +350,23 @@ mod tests {
let bank_ser = serialize(&bank).unwrap();
let child_bank = bank_forks.banks.get(&1).unwrap();
let child_bank_ser = serialize(&child_bank).unwrap();
let snapshot_path = "snapshots";
bank_forks.save_snapshot(snapshot_path).unwrap();
for (slot, _) in bank_forks.banks.iter() {
bank_forks.add_snapshot(*slot).unwrap();
}
drop(bank_forks);

let new = BankForks::load_from_snapshot(snapshot_path).unwrap();
let new = BankForks::load_from_snapshot().unwrap();
assert_eq!(new[0].tick_height(), tick_height);
let bank: Bank = deserialize(&bank_ser).unwrap();
let new_bank = new.banks.get(&0).unwrap();
bank.compare_bank(&new_bank);
let child_bank: Bank = deserialize(&child_bank_ser).unwrap();
let new_bank = new.banks.get(&1).unwrap();
child_bank.compare_bank(&new_bank);
for (slot, _) in new.banks.iter() {
new.remove_snapshot(*slot);
}
drop(new);
let _ = fs::remove_dir_all(snapshot_path);
}

#[test]
Expand Down
36 changes: 27 additions & 9 deletions core/src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ impl Fullnode {
assert_eq!(id, node.info.id);

let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver, leader_schedule_cache) =
new_banks_from_blocktree(ledger_path, config.account_paths.clone(), config.use_snapshot);
new_banks_from_blocktree(
ledger_path,
config.account_paths.clone(),
config.use_snapshot,
);

let leader_schedule_cache = Arc::new(leader_schedule_cache);
let exit = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -287,6 +291,26 @@ impl Fullnode {
}
}

fn get_bank_forks(
genesis_block: &GenesisBlock,
blocktree: &Blocktree,
account_paths: Option<String>,
use_snapshot: bool,
) -> (BankForks, Vec<BankForksInfo>, LeaderScheduleCache) {
if use_snapshot {
let bank_forks = BankForks::load_from_snapshot();
match bank_forks {
Ok(v) => {
let bank = &v.working_bank();
return (v, vec![], LeaderScheduleCache::new_from_bank(bank));
}
Err(_) => warn!("Failed to load from snapshot, fallback to load from ledger"),
}
}
blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths)
.expect("process_blocktree failed")
}

pub fn new_banks_from_blocktree(
blocktree_path: &str,
account_paths: Option<String>,
Expand All @@ -304,14 +328,8 @@ pub fn new_banks_from_blocktree(
let (blocktree, ledger_signal_receiver) = Blocktree::open_with_signal(blocktree_path)
.expect("Expected to successfully open database ledger");

let (bank_forks, bank_forks_info, leader_schedule_cache) = if use_snapshot {
let bank_forks = BankForks::load_from_snapshot("bank-snapshot").unwrap();
let bank = &bank_forks.working_bank();
(bank_forks, vec![], LeaderScheduleCache::new_from_bank(bank))
} else {
blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths)
.expect("process_blocktree failed")
};
let (bank_forks, bank_forks_info, leader_schedule_cache) =
get_bank_forks(&genesis_block, &blocktree, account_paths, use_snapshot);

(
bank_forks,
Expand Down
10 changes: 9 additions & 1 deletion fullnode/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,13 @@ fn main() {
.validator(port_range_validator)
.help("Range to use for dynamically assigned ports"),
)
.get_matches();
.arg(
clap::Arg::with_name("use_snapshot")
.long("use-snapshot")
.takes_value(false)
.help("Load / Store bank snapshots"),
)
.get_matches();

let mut fullnode_config = FullnodeConfig::default();
let keypair = if let Some(identity) = matches.value_of("identity") {
Expand Down Expand Up @@ -183,6 +189,8 @@ fn main() {

fullnode_config.sigverify_disabled = matches.is_present("no_sigverify");

fullnode_config.use_snapshot = matches.is_present("use_snapshot");

fullnode_config.voting_disabled = matches.is_present("no_voting");

if matches.is_present("enable_rpc_exit") {
Expand Down
3 changes: 2 additions & 1 deletion runtime/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ pub struct Accounts {

impl Drop for Accounts {
fn drop(&mut self) {
if self.own_paths {
if self.own_paths && (Arc::strong_count(&self.accounts_db) == 1) {
let paths = get_paths_vec(&self.paths);
paths.iter().for_each(|p| {
info!("drop remove {:?}", p);
let _ignored = remove_dir_all(p);

// it is safe to delete the parent
Expand Down

0 comments on commit 0fa5b6e

Please sign in to comment.