From 333eaf19f54fc8a88a3d71ee0c732032570ddb9b Mon Sep 17 00:00:00 2001 From: Sathish Ambley Date: Tue, 23 Apr 2019 22:27:16 -0700 Subject: [PATCH] load snapshot on start --- core/src/bank_forks.rs | 122 +++++++++++++++++++++++++++------------- core/src/fullnode.rs | 36 +++++++++--- fullnode/src/main.rs | 10 +++- runtime/src/accounts.rs | 3 +- 4 files changed, 121 insertions(+), 50 deletions(-) diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index bc19236ad82a60..df6cb0e095162e 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -5,6 +5,7 @@ use solana_metrics::counter::Counter; use solana_runtime::bank::Bank; use solana_sdk::timing; use std::collections::{HashMap, HashSet}; +use std::env; use std::fs; use std::fs::File; use std::io::{BufReader, BufWriter, Error, ErrorKind}; @@ -16,6 +17,8 @@ use std::time::Instant; pub struct BankForks { banks: HashMap>, working_bank: Arc, + slots: HashSet, + use_snapshot: bool, } impl Index for BankForks { @@ -33,6 +36,8 @@ impl BankForks { Self { banks, working_bank, + slots: HashSet::new(), + use_snapshot: false, } } @@ -91,6 +96,8 @@ impl BankForks { Self { banks, working_bank, + slots: HashSet::new(), + use_snapshot: false, } } @@ -122,60 +129,94 @@ impl BankForks { } fn prune_non_root(&mut self, root: u64) { + let slots: HashSet = self + .banks + .iter() + .filter(|(_, b)| b.is_frozen()) + .map(|(k, _)| *k) + .collect(); self.banks - .retain(|slot, bank| *slot >= root || bank.is_in_subtree_of(root)) + .retain(|slot, bank| *slot >= root || bank.is_in_subtree_of(root)); + let diff: HashSet<_> = slots.symmetric_difference(&self.slots).collect(); + for slot in diff.iter() { + if **slot > root { + let _ = self.add_snapshot(**slot); + } else { + self.remove_snapshot(**slot); + } + } + self.slots = slots.clone(); } fn get_io_error(error: &str) -> Error { Error::new(ErrorKind::Other, error) } - pub fn save_snapshot(&self, path: &str) -> Result<(), Error> { - let mut bank_index: HashMap = HashMap::new(); - let path = Path::new(&path); - fs::create_dir_all(path)?; - for (slot, bank) in &self.frozen_banks() { - let bank_file = format!("bank-{}.snapshot", slot); - let bank_file_path = path.join(bank_file); - bank_index.insert(*slot, bank_file_path.clone()); - let file = File::create(bank_file_path)?; - let mut stream = BufWriter::new(file); - serialize_into(&mut stream, &bank) - .map_err(|_| BankForks::get_io_error("serialize bank error"))?; - } - let index_path = Path::new(&path).join("bank.index"); - fs::create_dir_all(path)?; - { - let file = File::create(index_path)?; - let mut stream = BufWriter::new(file); - serialize_into(&mut stream, &bank_index) - .map_err(|_| BankForks::get_io_error("serialize bank error"))?; - } + fn get_snapshot_path() -> PathBuf { + let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); + let snapshot_dir = format!("{}/snapshots/", out_dir); + Path::new(&snapshot_dir).to_path_buf() + } + + pub fn add_snapshot(&self, slot: u64) -> Result<(), Error> { + let path = BankForks::get_snapshot_path(); + fs::create_dir_all(path.clone())?; + let bank_file = format!("{}", slot); + let bank_file_path = path.join(bank_file); + let file = File::create(bank_file_path)?; + let mut stream = BufWriter::new(file); + serialize_into(&mut stream, self.get(slot).unwrap()) + .map_err(|_| BankForks::get_io_error("serialize bank error"))?; Ok(()) } - pub fn load_from_snapshot(path: &str) -> Result { - let bank_index = { - let path = Path::new(path).join("bank.index"); - let index_file = File::open(path)?; - let mut stream = BufReader::new(index_file); - let index: HashMap = deserialize_from(&mut stream) - .map_err(|_| BankForks::get_io_error("deserialize snapshot index error"))?; - index - }; + pub fn remove_snapshot(&self, slot: u64) { + let path = BankForks::get_snapshot_path(); + let bank_file = format!("{}", slot); + let bank_file_path = path.join(bank_file); + let _ = fs::remove_file(bank_file_path); + } + + pub fn set_snapshot_config(&mut self, use_snapshot: bool) { + self.use_snapshot = use_snapshot; + } + + pub fn load_from_snapshot() -> Result { + let path = BankForks::get_snapshot_path(); + let paths = fs::read_dir(path.clone())?; + let names = paths + .filter_map(|entry| { + entry.ok().and_then(|e| { + e.path() + .file_name() + .and_then(|n| n.to_str().map(String::from)) + }) + }) + .collect::>(); let mut banks: HashMap> = HashMap::new(); - for (slot, bank_path) in &bank_index { - let file = File::open(bank_path)?; + let mut slots = HashSet::new(); + let mut last_slot: u64 = 0; + for bank_path in names { + let bank_file_path = path.join(bank_path.clone()); + info!("Load from {:?}", bank_file_path); + let file = File::open(bank_file_path)?; let mut stream = BufReader::new(file); let bank: Bank = deserialize_from(&mut stream) .map_err(|_| BankForks::get_io_error("deserialize bank error"))?; - banks.insert(*slot, Arc::new(bank)); + let slot = bank_path.parse::().unwrap(); + banks.insert(slot, Arc::new(bank)); + slots.insert(slot); + if slot > last_slot { + last_slot = slot; + } } - let working_bank = banks[&0].clone(); + let working_bank = banks[&last_slot].clone(); Ok(BankForks { banks, working_bank, + slots, + use_snapshot: true, }) } } @@ -309,11 +350,12 @@ mod tests { let bank_ser = serialize(&bank).unwrap(); let child_bank = bank_forks.banks.get(&1).unwrap(); let child_bank_ser = serialize(&child_bank).unwrap(); - let snapshot_path = "snapshots"; - bank_forks.save_snapshot(snapshot_path).unwrap(); + for (slot, _) in bank_forks.banks.iter() { + bank_forks.add_snapshot(*slot).unwrap(); + } drop(bank_forks); - let new = BankForks::load_from_snapshot(snapshot_path).unwrap(); + let new = BankForks::load_from_snapshot().unwrap(); assert_eq!(new[0].tick_height(), tick_height); let bank: Bank = deserialize(&bank_ser).unwrap(); let new_bank = new.banks.get(&0).unwrap(); @@ -321,8 +363,10 @@ mod tests { let child_bank: Bank = deserialize(&child_bank_ser).unwrap(); let new_bank = new.banks.get(&1).unwrap(); child_bank.compare_bank(&new_bank); + for (slot, _) in new.banks.iter() { + new.remove_snapshot(*slot); + } drop(new); - let _ = fs::remove_dir_all(snapshot_path); } #[test] diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index 5139485e0ef416..ce6e3e9aafd64f 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -98,7 +98,11 @@ impl Fullnode { assert_eq!(id, node.info.id); let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver, leader_schedule_cache) = - new_banks_from_blocktree(ledger_path, config.account_paths.clone(), config.use_snapshot); + new_banks_from_blocktree( + ledger_path, + config.account_paths.clone(), + config.use_snapshot, + ); let leader_schedule_cache = Arc::new(leader_schedule_cache); let exit = Arc::new(AtomicBool::new(false)); @@ -285,6 +289,26 @@ impl Fullnode { } } +fn get_bank_forks( + genesis_block: &GenesisBlock, + blocktree: &Blocktree, + account_paths: Option, + use_snapshot: bool, +) -> (BankForks, Vec, LeaderScheduleCache) { + if use_snapshot { + let bank_forks = BankForks::load_from_snapshot(); + match bank_forks { + Ok(v) => { + let bank = &v.working_bank(); + return (v, vec![], LeaderScheduleCache::new_from_bank(bank)); + } + Err(_) => warn!("Failed to load from snapshot, fallback to load from ledger"), + } + } + blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths) + .expect("process_blocktree failed") +} + pub fn new_banks_from_blocktree( blocktree_path: &str, account_paths: Option, @@ -302,14 +326,8 @@ pub fn new_banks_from_blocktree( let (blocktree, ledger_signal_receiver) = Blocktree::open_with_signal(blocktree_path) .expect("Expected to successfully open database ledger"); - let (bank_forks, bank_forks_info, leader_schedule_cache) = if use_snapshot { - let bank_forks = BankForks::load_from_snapshot("bank-snapshot").unwrap(); - let bank = &bank_forks.working_bank(); - (bank_forks, vec![], LeaderScheduleCache::new_from_bank(bank)) - } else { - blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths) - .expect("process_blocktree failed") - }; + let (bank_forks, bank_forks_info, leader_schedule_cache) = + get_bank_forks(&genesis_block, &blocktree, account_paths, use_snapshot); ( bank_forks, diff --git a/fullnode/src/main.rs b/fullnode/src/main.rs index 31d5ea4b1b3239..05d7217c8dcebe 100644 --- a/fullnode/src/main.rs +++ b/fullnode/src/main.rs @@ -153,7 +153,13 @@ fn main() { .validator(port_range_validator) .help("Range to use for dynamically assigned ports"), ) - .get_matches(); + .arg( + clap::Arg::with_name("use_snapshot") + .long("use-snapshot") + .takes_value(false) + .help("Load / Store bank snapshots"), + ) + .get_matches(); let mut fullnode_config = FullnodeConfig::default(); let keypair = if let Some(identity) = matches.value_of("identity") { @@ -183,6 +189,8 @@ fn main() { fullnode_config.sigverify_disabled = matches.is_present("no_sigverify"); + fullnode_config.use_snapshot = matches.is_present("use_snapshot"); + fullnode_config.voting_disabled = matches.is_present("no_voting"); if matches.is_present("enable_rpc_exit") { diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 0d312a348d9aef..2c551418926d99 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -56,9 +56,10 @@ pub struct Accounts { impl Drop for Accounts { fn drop(&mut self) { - if self.own_paths { + if self.own_paths && (Arc::strong_count(&self.accounts_db) == 1) { let paths = get_paths_vec(&self.paths); paths.iter().for_each(|p| { + info!("drop remove {:?}", p); let _ignored = remove_dir_all(p); // it is safe to delete the parent