diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index c15625b9d8e476..6dca53ab9c36e4 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -1,10 +1,15 @@ //! The `bank_forks` module implments BankForks a DAG of checkpointed Banks -use hashbrown::{HashMap, HashSet}; +use bincode::{deserialize_from, serialize_into}; use solana_metrics::counter::Counter; use solana_runtime::bank::Bank; use solana_sdk::timing; +use std::collections::{HashMap, HashSet}; +use std::fs; +use std::fs::File; +use std::io::{BufReader, BufWriter, Error, ErrorKind}; use std::ops::Index; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Instant; @@ -119,14 +124,72 @@ impl BankForks { self.banks .retain(|slot, bank| *slot >= root || bank.is_in_subtree_of(root)) } + + fn get_io_error(error: &str) -> Error { + Error::new(ErrorKind::Other, error) + } + + pub fn save_snapshot(&self, path: &str) -> Result<(), Error> { + let mut bank_index: HashMap = HashMap::new(); + let path = Path::new(&path); + fs::create_dir_all(path)?; + for (slot, bank) in &self.frozen_banks() { + let bank_file = format!("bank-{}.snapshot", slot); + let bank_file_path = path.join(bank_file); + bank_index.insert(*slot, bank_file_path.clone()); + let file = File::create(bank_file_path)?; + let mut stream = BufWriter::new(file); + serialize_into(&mut stream, &bank) + .map_err(|_| BankForks::get_io_error("serialize bank error"))?; + } + let index_path = Path::new(&path).join("bank.index"); + fs::create_dir_all(path)?; + { + let file = File::create(index_path)?; + let mut stream = BufWriter::new(file); + serialize_into(&mut stream, &bank_index) + .map_err(|_| BankForks::get_io_error("serialize bank error"))?; + } + Ok(()) + } + + pub fn load_from_snapshot(path: &str) -> Result { + let bank_index = { + let path = Path::new(path).join("bank.index"); + let index_file = File::open(path)?; + let mut stream = BufReader::new(index_file); + let index: HashMap = deserialize_from(&mut stream) + .map_err(|_| BankForks::get_io_error("deserialize snapshot index error"))?; + index + }; + + let mut banks: HashMap> = HashMap::new(); + for (slot, bank_path) in &bank_index { + let file = File::open(bank_path)?; + let mut stream = BufReader::new(file); + let bank: Bank = deserialize_from(&mut stream) + .map_err(|_| BankForks::get_io_error("deserialize bank error"))?; + banks.insert(*slot, Arc::new(bank)); + } + let working_bank = banks[&0].clone(); + Ok(BankForks { + banks, + working_bank, + }) + } } #[cfg(test)] mod tests { use super::*; + use bincode::{deserialize, serialize}; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::system_transaction; + use std::env; + use std::fs::remove_dir_all; #[test] fn test_bank_forks() { @@ -152,7 +215,7 @@ mod tests { bank_forks.insert(bank); let descendants = bank_forks.descendants(); let children: Vec = descendants[&0].iter().cloned().collect(); - assert_eq!(children, vec![1, 2]); + assert_eq!(children, vec![2, 1]); assert!(descendants[&1].is_empty()); assert!(descendants[&2].is_empty()); } @@ -196,4 +259,93 @@ mod tests { assert_eq!(bank_forks.active_banks(), vec![1]); } + struct TempPaths { + pub paths: String, + } + + #[macro_export] + macro_rules! tmp_bank_accounts_name { + () => { + &format!("{}-{}", file!(), line!()) + }; + } + + #[macro_export] + macro_rules! get_tmp_bank_accounts_path { + () => { + get_tmp_bank_accounts_path(tmp_bank_accounts_name!()) + }; + } + + impl Drop for TempPaths { + fn drop(&mut self) { + let paths: Vec = self.paths.split(',').map(|s| s.to_string()).collect(); + paths.iter().for_each(|p| { + let _ignored = remove_dir_all(p); + }); + } + } + + fn get_paths_vec(paths: &str) -> Vec { + paths.split(',').map(|s| s.to_string()).collect() + } + + fn get_tmp_bank_accounts_path(paths: &str) -> TempPaths { + let vpaths = get_paths_vec(paths); + let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); + let vpaths: Vec<_> = vpaths + .iter() + .map(|path| format!("{}/{}", out_dir, path)) + .collect(); + TempPaths { + paths: vpaths.join(","), + } + } + + fn save_and_load_snapshot(bank_forks: &BankForks) { + let bank = bank_forks.banks.get(&0).unwrap(); + let tick_height = bank.tick_height(); + let bank_ser = serialize(&bank).unwrap(); + let child_bank = bank_forks.banks.get(&1).unwrap(); + let child_bank_ser = serialize(&child_bank).unwrap(); + let snapshot_path = "snapshots"; + bank_forks.save_snapshot(snapshot_path).unwrap(); + drop(bank_forks); + + let new = BankForks::load_from_snapshot(snapshot_path).unwrap(); + assert_eq!(new[0].tick_height(), tick_height); + let bank: Bank = deserialize(&bank_ser).unwrap(); + let new_bank = new.banks.get(&0).unwrap(); + bank.compare_bank(&new_bank); + let child_bank: Bank = deserialize(&child_bank_ser).unwrap(); + let new_bank = new.banks.get(&1).unwrap(); + child_bank.compare_bank(&new_bank); + drop(new); + let _ = fs::remove_dir_all(snapshot_path); + } + + #[test] + fn test_bank_forks_snapshot_n() { + solana_logger::setup(); + let path = get_tmp_bank_accounts_path!(); + let (genesis_block, mint_keypair) = GenesisBlock::new(10_000); + let bank0 = Bank::new_with_paths(&genesis_block, Some(path.paths.clone())); + bank0.freeze(); + let mut bank_forks = BankForks::new(0, bank0); + for index in 0..10 { + let bank = Bank::new_from_parent(&bank_forks[index], &Pubkey::default(), index + 1); + let key1 = Keypair::new().pubkey(); + let tx = system_transaction::create_user_account( + &mint_keypair, + &key1, + 1, + genesis_block.hash(), + 0, + ); + assert_eq!(bank.process_transaction(&tx), Ok(())); + bank.freeze(); + bank_forks.insert(bank); + save_and_load_snapshot(&bank_forks); + } + } } diff --git a/core/src/blocktree.rs b/core/src/blocktree.rs index 3fdfd39f24410b..352ab5630041f3 100644 --- a/core/src/blocktree.rs +++ b/core/src/blocktree.rs @@ -12,7 +12,7 @@ use solana_kvstore as kvstore; use bincode::deserialize; -use hashbrown::HashMap; +use std::collections::HashMap; #[cfg(not(feature = "kvstore"))] use rocksdb; diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index 3ab0a5037ec1c4..e5c0f8332f3195 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -26,7 +26,6 @@ use crate::staking_utils; use crate::streamer::{BlobReceiver, BlobSender}; use bincode::{deserialize, serialize}; use core::cmp; -use hashbrown::HashMap; use rand::{thread_rng, Rng}; use rayon::prelude::*; use solana_metrics::counter::Counter; @@ -41,6 +40,7 @@ use solana_sdk::signature::{Keypair, KeypairUtil, Signable, Signature}; use solana_sdk::timing::{duration_as_ms, timestamp}; use solana_sdk::transaction::Transaction; use std::cmp::min; +use std::collections::HashMap; use std::fmt; use std::io; use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}; diff --git a/core/src/crds_gossip.rs b/core/src/crds_gossip.rs index 966900fdef1e68..8102d02140b351 100644 --- a/core/src/crds_gossip.rs +++ b/core/src/crds_gossip.rs @@ -8,10 +8,10 @@ use crate::crds_gossip_error::CrdsGossipError; use crate::crds_gossip_pull::CrdsGossipPull; use crate::crds_gossip_push::{CrdsGossipPush, CRDS_GOSSIP_NUM_ACTIVE}; use crate::crds_value::CrdsValue; -use hashbrown::HashMap; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; +use std::collections::HashMap; ///The min size for bloom filters pub const CRDS_GOSSIP_BLOOM_SIZE: usize = 1000; diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 73584391bb1c33..44b5936c074b38 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -16,13 +16,13 @@ use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::packet::BLOB_DATA_SIZE; use bincode::serialized_size; -use hashbrown::HashMap; use rand; use rand::distributions::{Distribution, WeightedIndex}; use solana_runtime::bloom::Bloom; use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use std::cmp; +use std::collections::HashMap; use std::collections::VecDeque; pub const CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS: u64 = 15000; diff --git a/core/src/crds_gossip_push.rs b/core/src/crds_gossip_push.rs index c8ca02971dcfb7..e8acd98ab70da1 100644 --- a/core/src/crds_gossip_push.rs +++ b/core/src/crds_gossip_push.rs @@ -15,7 +15,6 @@ use crate::crds_gossip_error::CrdsGossipError; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use crate::packet::BLOB_DATA_SIZE; use bincode::serialized_size; -use hashbrown::HashMap; use indexmap::map::IndexMap; use rand; use rand::distributions::{Distribution, WeightedIndex}; @@ -25,6 +24,7 @@ use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::cmp; +use std::collections::HashMap; pub const CRDS_GOSSIP_NUM_ACTIVE: usize = 30; pub const CRDS_GOSSIP_PUSH_FANOUT: usize = 6; diff --git a/core/src/fullnode.rs b/core/src/fullnode.rs index db4a76789fa793..7b0bd74f177e30 100644 --- a/core/src/fullnode.rs +++ b/core/src/fullnode.rs @@ -45,6 +45,7 @@ pub struct FullnodeConfig { pub tick_config: PohServiceConfig, pub account_paths: Option, pub rpc_config: JsonRpcConfig, + pub use_snapshot: bool, } impl Default for FullnodeConfig { fn default() -> Self { @@ -60,6 +61,7 @@ impl Default for FullnodeConfig { tick_config: PohServiceConfig::default(), account_paths: None, rpc_config: JsonRpcConfig::default(), + use_snapshot: false, } } } @@ -95,7 +97,11 @@ impl Fullnode { assert_eq!(id, node.info.id); let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) = - new_banks_from_blocktree(ledger_path, config.account_paths.clone()); + new_banks_from_blocktree( + ledger_path, + config.account_paths.clone(), + config.use_snapshot, + ); let exit = Arc::new(AtomicBool::new(false)); let bank_info = &bank_forks_info[0]; @@ -275,6 +281,7 @@ impl Fullnode { pub fn new_banks_from_blocktree( blocktree_path: &str, account_paths: Option, + use_snapshot: bool, ) -> (BankForks, Vec, Blocktree, Receiver) { let genesis_block = GenesisBlock::load(blocktree_path).expect("Expected to successfully open genesis block"); @@ -282,9 +289,13 @@ pub fn new_banks_from_blocktree( let (blocktree, ledger_signal_receiver) = Blocktree::open_with_signal(blocktree_path) .expect("Expected to successfully open database ledger"); - let (bank_forks, bank_forks_info) = + let (bank_forks, bank_forks_info) = if use_snapshot { + let bank_forks = BankForks::load_from_snapshot("bank-snapshot").unwrap(); + (bank_forks, vec![]) + } else { blocktree_processor::process_blocktree(&genesis_block, &blocktree, account_paths) - .expect("process_blocktree failed"); + .expect("process_blocktree failed") + }; ( bank_forks, diff --git a/core/src/locktower.rs b/core/src/locktower.rs index 083aa595792908..20f063428c60b2 100644 --- a/core/src/locktower.rs +++ b/core/src/locktower.rs @@ -1,11 +1,11 @@ use crate::bank_forks::BankForks; use crate::staking_utils; -use hashbrown::{HashMap, HashSet}; use solana_metrics::influxdb; use solana_runtime::bank::Bank; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use solana_vote_api::vote_state::{Lockout, Vote, VoteState, MAX_LOCKOUT_HISTORY}; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; pub const VOTE_THRESHOLD_DEPTH: usize = 8; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 3a8a212763f901..30e4baf317c4d2 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -12,7 +12,6 @@ use crate::poh_recorder::PohRecorder; use crate::result::{Error, Result}; use crate::rpc_subscriptions::RpcSubscriptions; use crate::service::Service; -use hashbrown::HashMap; use solana_metrics::counter::Counter; use solana_metrics::influxdb; use solana_runtime::bank::Bank; @@ -22,6 +21,7 @@ use solana_sdk::signature::KeypairUtil; use solana_sdk::timing::{self, duration_as_ms}; use solana_sdk::transaction::Transaction; use solana_vote_api::vote_instruction; +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex, RwLock}; @@ -637,7 +637,7 @@ mod test { { let voting_keypair = Arc::new(Keypair::new()); let (bank_forks, _bank_forks_info, blocktree, l_receiver) = - new_banks_from_blocktree(&my_ledger_path, None); + new_banks_from_blocktree(&my_ledger_path, None, false); let bank = bank_forks.working_bank(); let blocktree = Arc::new(blocktree); diff --git a/core/src/staking_utils.rs b/core/src/staking_utils.rs index 77e1055411a012..2c769b594f6bd2 100644 --- a/core/src/staking_utils.rs +++ b/core/src/staking_utils.rs @@ -1,9 +1,9 @@ -use hashbrown::HashMap; use solana_runtime::bank::Bank; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; use solana_vote_api::vote_state::VoteState; use std::borrow::Borrow; +use std::collections::HashMap; /// Looks through vote accounts, and finds the latest slot that has achieved /// supermajority lockout @@ -66,7 +66,7 @@ pub fn node_staked_accounts_at_epoch( ) -> Option> { bank.epoch_vote_accounts(epoch_height).map(|epoch_state| { epoch_state - .into_iter() + .iter() .filter_map(|(account_id, account)| { filter_zero_balances(account).map(|stake| (account_id, stake, account)) }) @@ -149,10 +149,10 @@ where mod tests { use super::*; use crate::voting_keypair::tests as voting_keypair_tests; - use hashbrown::HashSet; use solana_sdk::genesis_block::GenesisBlock; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; + use std::collections::HashSet; use std::iter::FromIterator; use std::sync::Arc; diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index 9e140fb9c13cc9..5ef6d4405ea304 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -1,4 +1,3 @@ -use hashbrown::{HashMap, HashSet}; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::prelude::*; use solana::cluster_info::{ @@ -7,6 +6,7 @@ use solana::cluster_info::{ }; use solana::contact_info::ContactInfo; use solana_sdk::pubkey::Pubkey; +use std::collections::{HashMap, HashSet}; use std::sync::mpsc::channel; use std::sync::mpsc::TryRecvError; use std::sync::mpsc::{Receiver, Sender}; diff --git a/core/tests/crds_gossip.rs b/core/tests/crds_gossip.rs index 0ee42b0f521d6f..235b5f2fb0ddae 100644 --- a/core/tests/crds_gossip.rs +++ b/core/tests/crds_gossip.rs @@ -1,5 +1,4 @@ use bincode::serialized_size; -use hashbrown::HashMap; use log::*; use rayon::prelude::*; use solana::contact_info::ContactInfo; @@ -11,6 +10,7 @@ use solana::crds_value::CrdsValueLabel; use solana_sdk::hash::hash; use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; +use std::collections::HashMap; use std::sync::{Arc, Mutex}; type Node = Arc>; diff --git a/core/tests/tvu.rs b/core/tests/tvu.rs index f72f4f4c1b2468..8aed6a4c14d145 100644 --- a/core/tests/tvu.rs +++ b/core/tests/tvu.rs @@ -84,7 +84,7 @@ fn test_replay() { let tvu_addr = target1.info.tvu; let (bank_forks, bank_forks_info, blocktree, ledger_signal_receiver) = - fullnode::new_banks_from_blocktree(&blocktree_path, None); + fullnode::new_banks_from_blocktree(&blocktree_path, None, false); let bank = bank_forks.working_bank(); assert_eq!( bank.get_balance(&mint_keypair.pubkey()), diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index b314676e11067e..03cf0f7ad770e4 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -20,7 +20,7 @@ log = "0.4.2" memmap = "0.6.2" rand = "0.6.5" rayon = "1.0.0" -serde = "1.0.88" +serde = { version = "1.0.88", features = ["rc"] } serde_derive = "1.0.88" serde_json = "1.0.38" solana-logger = { path = "../logger", version = "0.14.0" } diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index 57c54d6cc3bb25..23d18260ae6228 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -6,8 +6,8 @@ use crate::accounts_index::{AccountsIndex, Fork}; use crate::append_vec::StoredAccount; use crate::message_processor::has_duplicates; use bincode::serialize; -use hashbrown::{HashMap, HashSet}; use log::*; +use serde::{Deserialize, Serialize}; use solana_metrics::counter::Counter; use solana_sdk::account::Account; use solana_sdk::fee_calculator::FeeCalculator; @@ -17,6 +17,7 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::transaction::Result; use solana_sdk::transaction::{Transaction, TransactionError}; +use std::collections::{HashMap, HashSet}; use std::env; use std::fs::remove_dir_all; use std::ops::Neg; @@ -28,7 +29,7 @@ const ACCOUNTSDB_DIR: &str = "accountsdb"; const NUM_ACCOUNT_DIRS: usize = 4; /// This structure handles synchronization for db -#[derive(Default)] +#[derive(Default, Debug, Serialize, Deserialize)] pub struct Accounts { /// Single global AccountsDB pub accounts_db: Arc, @@ -46,16 +47,16 @@ pub struct Accounts { impl Drop for Accounts { fn drop(&mut self) { - let paths = get_paths_vec(&self.paths); - paths.iter().for_each(|p| { - let _ignored = remove_dir_all(p); + if self.own_paths { + let paths = get_paths_vec(&self.paths); + paths.iter().for_each(|p| { + let _ignored = remove_dir_all(p); - // it is safe to delete the parent - if self.own_paths { + // it is safe to delete the parent let path = Path::new(p); let _ignored = remove_dir_all(path.parent().unwrap()); - } - }); + }); + } } } @@ -434,6 +435,14 @@ impl Accounts { pub fn add_root(&self, fork: Fork) { self.accounts_db.add_root(fork) } + + pub fn compare_accounts(accounts: &Accounts, daccounts: &Accounts) { + let account_locks = accounts.account_locks.lock().unwrap(); + let daccount_locks = daccounts.account_locks.lock().unwrap(); + assert_eq!(*account_locks, *daccount_locks); + assert_eq!(accounts.paths, daccounts.paths); + assert_eq!(accounts.own_paths, daccounts.own_paths); + } } #[cfg(test)] @@ -441,11 +450,14 @@ mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; + use bincode::{deserialize_from, serialize_into, serialized_size}; + use rand::{Rng, thread_rng}; use solana_sdk::account::Account; use solana_sdk::hash::Hash; use solana_sdk::instruction::CompiledInstruction; use solana_sdk::signature::{Keypair, KeypairUtil}; use solana_sdk::transaction::Transaction; + use std::io::Cursor; fn load_accounts_with_fee( tx: Transaction, @@ -898,4 +910,41 @@ mod tests { assert_eq!(accounts.hash_internal_state(0), None); } + fn create_accounts(accounts: &Accounts, pubkeys: &mut Vec, num: usize) { + for t in 0..num { + let pubkey = Pubkey::new_rand(); + let account = Account::new((t + 1) as u64, 0, &Account::default().owner); + accounts.store_slow(0, &pubkey, &account); + pubkeys.push(pubkey.clone()); + } + } + + fn check_accounts(accounts: &Accounts, pubkeys: &Vec, num: usize) { + for _ in 1..num { + let idx = thread_rng().gen_range(0, num - 1); + let ancestors = vec![(0, 0)].into_iter().collect(); + let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap(); + let account1 = Account::new((idx + 1) as u64, 0, &Account::default().owner); + assert_eq!(account, account1); + } + } + + #[test] + fn test_accounts_serialize() { + solana_logger::setup(); + let accounts = Accounts::new(Some("serialize_accounts".to_string())); + + let mut pubkeys: Vec = vec![]; + create_accounts(&accounts, &mut pubkeys, 100); + check_accounts(&accounts, &pubkeys, 100); + + let mut buf = vec![0u8; serialized_size(&accounts).unwrap() as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, &accounts).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let daccounts: Accounts = deserialize_from(&mut reader).unwrap(); + check_accounts(&daccounts, &pubkeys, 100); + Accounts::compare_accounts(&accounts, &daccounts); + } } diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 5f4955d7c5623b..044c183f6770de 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -20,12 +20,13 @@ use crate::accounts_index::{AccountsIndex, Fork}; use crate::append_vec::{AppendVec, StorageMeta, StoredAccount}; -use hashbrown::{HashMap, HashSet}; +use crate::serde_utils::{deserialize_atomicusize, serialize_atomicusize}; use log::*; use rand::{thread_rng, Rng}; use rayon::prelude::*; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; +use std::collections::{HashMap, HashSet}; use std::fs::{create_dir_all, remove_dir_all}; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -49,7 +50,7 @@ pub struct ErrorCounters { pub missing_signature_for_fee: usize, } -#[derive(Default, Clone)] +#[derive(Deserialize, Serialize, Default, Debug, PartialEq, Clone)] pub struct AccountInfo { /// index identifying the append storage id: AppendVecId, @@ -86,6 +87,7 @@ impl From for AccountStorageStatus { } /// Persistent storage structure holding the accounts +#[derive(Debug, Deserialize, Serialize)] pub struct AccountStorageEntry { fork_id: Fork, @@ -95,9 +97,13 @@ pub struct AccountStorageEntry { /// Keeps track of the number of accounts stored in a specific AppendVec. /// This is periodically checked to reuse the stores that do not have /// any accounts in it. + #[serde(serialize_with = "serialize_atomicusize")] + #[serde(deserialize_with = "deserialize_atomicusize")] count: AtomicUsize, /// status corresponding to the storage + #[serde(serialize_with = "serialize_atomicusize")] + #[serde(deserialize_with = "deserialize_atomicusize")] status: AtomicUsize, } @@ -141,7 +147,7 @@ impl AccountStorageEntry { } // This structure handles the load/store of the accounts -#[derive(Default)] +#[derive(Default, Debug, Serialize, Deserialize)] pub struct AccountsDB { /// Keeps tracks of index into AppendVec on a per fork basis pub accounts_index: RwLock>, @@ -150,9 +156,13 @@ pub struct AccountsDB { pub storage: RwLock, /// distribute the accounts across storage lists + #[serde(serialize_with = "serialize_atomicusize")] + #[serde(deserialize_with = "deserialize_atomicusize")] next_id: AtomicUsize, /// write version + #[serde(serialize_with = "serialize_atomicusize")] + #[serde(deserialize_with = "deserialize_atomicusize")] write_version: AtomicUsize, /// Set of storage paths to pick from @@ -393,8 +403,10 @@ impl AccountsDB { mod tests { // TODO: all the bank tests are bank specific, issue: 2194 use super::*; + use bincode::{deserialize_from, serialize_into, serialized_size}; use rand::{thread_rng, Rng}; use solana_sdk::account::Account; + use std::io::Cursor; fn cleanup_paths(paths: &str) { let paths = get_paths_vec(&paths); @@ -681,14 +693,13 @@ mod tests { stores[0].count.load(Ordering::Relaxed) == count } - fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec, fork: Fork) { - for _ in 1..100 { - let idx = thread_rng().gen_range(0, 99); + fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec, fork: Fork, num: usize) { + for _ in 1..num { + let idx = thread_rng().gen_range(0, num - 1); let ancestors = vec![(fork, 0)].into_iter().collect(); let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap(); - let mut default_account = Account::default(); - default_account.lamports = (idx + 1) as u64; - assert_eq!(default_account, account); + let account1 = Account::new((idx + 1) as u64, 0, &Account::default().owner); + assert_eq!(account, account1); } } @@ -711,7 +722,7 @@ mod tests { let accounts = AccountsDB::new(&paths.paths); let mut pubkeys: Vec = vec![]; create_account(&accounts, &mut pubkeys, 0, 100, 0, 0); - check_accounts(&accounts, &pubkeys, 0); + check_accounts(&accounts, &pubkeys, 0, 100); } #[test] @@ -831,4 +842,27 @@ mod tests { assert!(accounts.load_slow(&ancestors, &pubkeys[0]).is_some()); } + #[test] + fn test_accounts_db_serialize() { + solana_logger::setup(); + let paths = get_tmp_accounts_path!(); + let accounts = AccountsDB::new(&paths.paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 0, 100, 0, 0); + assert_eq!(check_storage(&accounts, 100), true); + check_accounts(&accounts, &pubkeys, 0, 100); + + let mut pubkeys1: Vec = vec![]; + create_account(&accounts, &mut pubkeys1, 1, 10, 0, 0); + + let mut buf = vec![0u8; serialized_size(&accounts).unwrap() as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, &accounts).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let daccounts: AccountsDB = deserialize_from(&mut reader).unwrap(); + assert_eq!(check_storage(&daccounts, 110), true); + check_accounts(&daccounts, &pubkeys, 0, 100); + check_accounts(&daccounts, &pubkeys1, 1, 10); + } } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 0ae3e16e7f2784..3fe952c27e80d1 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -1,10 +1,11 @@ -use hashbrown::{HashMap, HashSet}; use log::*; use solana_sdk::pubkey::Pubkey; +use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; pub type Fork = u64; -#[derive(Default)] +#[derive(Debug, Default, Deserialize, Serialize)] pub struct AccountsIndex { account_maps: HashMap>, roots: HashSet, diff --git a/runtime/src/accounts_memory.rs b/runtime/src/accounts_memory.rs new file mode 100644 index 00000000000000..7639014f85e59e --- /dev/null +++ b/runtime/src/accounts_memory.rs @@ -0,0 +1,884 @@ +use crate::accounts::ErrorCounters; +use crate::bank::{BankError, Result}; +use crate::runtime::has_duplicates; +use bincode::serialize; +use hashbrown::{HashMap, HashSet}; +use log::debug; +use solana_metrics::counter::Counter; +use solana_sdk::account::Account; +use solana_sdk::hash::{hash, Hash}; +use solana_sdk::native_loader; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::transaction::Transaction; +use std::collections::BTreeMap; +use std::ops::Deref; +use std::sync::{Mutex, RwLock}; + +pub type InstructionAccounts = Vec; +pub type InstructionLoaders = Vec>; + +#[derive(Debug, Default)] +pub struct ErrorCounters { + pub account_not_found: usize, + pub account_in_use: usize, + pub account_loaded_twice: usize, + pub last_id_not_found: usize, + pub last_id_too_old: usize, + pub reserve_last_id: usize, + pub insufficient_funds: usize, + pub duplicate_signature: usize, + pub call_chain_too_deep: usize, + pub missing_signature_for_fee: usize, +} + +/// This structure handles the load/store of the accounts +struct AccountsDB { + /// Mapping of known public keys/IDs to accounts + accounts: HashMap, + + /// The number of transactions the bank has processed without error since the + /// start of the ledger. + transaction_count: u64, +} + +/// This structure handles synchronization for db +struct Accounts { + accounts_db: RwLock, + + /// set of accounts which are currently in the pipeline + account_locks: Mutex>, + + /// List of all parents corresponding to this fork + parents: RwLock>>, +} + +pub struct HashMapAccountsStore { + accounts: RwLock>>, +} + +impl Default for AccountsDB { + fn default() -> Self { + Self { + accounts: HashMap::new(), + transaction_count: 0, + } + } +} + +impl AccountsDB { + pub fn hash_internal_state(&self) -> Hash { + let mut ordered_accounts = BTreeMap::new(); + + // only hash internal state of the part being voted upon, i.e. since last + // checkpoint + for (pubkey, account) in &self.accounts { + ordered_accounts.insert(*pubkey, account.clone()); + } + + hash(&serialize(&ordered_accounts).unwrap()) + } + + fn load(checkpoints: &[U], pubkey: &Pubkey) -> Option + where + U: Deref, + { + for db in checkpoints { + if let Some(account) = db.accounts.get(pubkey) { + return Some(account.clone()); + } + } + None + } + /// Store the account update. If the update is to delete the account because the token balance + /// is 0, purge needs to be set to true for the delete to occur in place. + pub fn store(&mut self, purge: bool, pubkey: &Pubkey, account: &Account) { + if account.tokens == 0 { + if purge { + // purge if balance is 0 and no checkpoints + self.accounts.remove(pubkey); + } else { + // store default account if balance is 0 and there's a checkpoint + self.accounts.insert(pubkey.clone(), Account::default()); + } + } else { + self.accounts.insert(pubkey.clone(), account.clone()); + } + } + + pub fn store_accounts( + &mut self, + purge: bool, + txs: &[Transaction], + res: &[Result<()>], + loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], + ) { + for (i, raccs) in loaded.iter().enumerate() { + if res[i].is_err() || raccs.is_err() { + continue; + } + + let tx = &txs[i]; + let acc = raccs.as_ref().unwrap(); + for (key, account) in tx.account_keys.iter().zip(acc.0.iter()) { + self.store(purge, key, account); + } + } + } + fn load_tx_accounts( + checkpoints: &[U], + tx: &Transaction, + error_counters: &mut ErrorCounters, + ) -> Result> + where + U: Deref, + { + // Copy all the accounts + if tx.signatures.is_empty() && tx.fee != 0 { + Err(BankError::MissingSignatureForFee) + } else { + // Check for unique account keys + if has_duplicates(&tx.account_keys) { + error_counters.account_loaded_twice += 1; + return Err(BankError::AccountLoadedTwice); + } + + // There is no way to predict what program will execute without an error + // If a fee can pay for execution then the program will be scheduled + let mut called_accounts: Vec = vec![]; + for key in &tx.account_keys { + called_accounts.push(Self::load(checkpoints, key).unwrap_or_default()); + } + if called_accounts.is_empty() || called_accounts[0].tokens == 0 { + error_counters.account_not_found += 1; + Err(BankError::AccountNotFound) + } else if called_accounts[0].tokens < tx.fee { + error_counters.insufficient_funds += 1; + Err(BankError::InsufficientFundsForFee) + } else { + called_accounts[0].tokens -= tx.fee; + Ok(called_accounts) + } + } + } + + fn load_executable_accounts( + checkpoints: &[U], + mut program_id: Pubkey, + error_counters: &mut ErrorCounters, + ) -> Result> + where + U: Deref, + { + let mut accounts = Vec::new(); + let mut depth = 0; + loop { + if native_loader::check_id(&program_id) { + // at the root of the chain, ready to dispatch + break; + } + + if depth >= 5 { + error_counters.call_chain_too_deep += 1; + return Err(BankError::CallChainTooDeep); + } + depth += 1; + + let program = match Self::load(checkpoints, &program_id) { + Some(program) => program, + None => { + error_counters.account_not_found += 1; + return Err(BankError::AccountNotFound); + } + }; + if !program.executable || program.owner == Pubkey::default() { + error_counters.account_not_found += 1; + return Err(BankError::AccountNotFound); + } + + // add loader to chain + accounts.insert(0, (program_id, program.clone())); + + program_id = program.owner; + } + Ok(accounts) + } + + /// For each program_id in the transaction, load its loaders. + fn load_loaders( + checkpoints: &[U], + tx: &Transaction, + error_counters: &mut ErrorCounters, + ) -> Result>> + where + U: Deref, + { + tx.instructions + .iter() + .map(|ix| { + if tx.program_ids.len() <= ix.program_ids_index as usize { + error_counters.account_not_found += 1; + return Err(BankError::AccountNotFound); + } + let program_id = tx.program_ids[ix.program_ids_index as usize]; + Self::load_executable_accounts(checkpoints, program_id, error_counters) + }) + .collect() + } + + fn load_accounts( + checkpoints: &[U], + txs: &[Transaction], + lock_results: Vec>, + error_counters: &mut ErrorCounters, + ) -> Vec> + where + U: Deref, + { + txs.iter() + .zip(lock_results.into_iter()) + .map(|etx| match etx { + (tx, Ok(())) => { + let accounts = Self::load_tx_accounts(checkpoints, tx, error_counters)?; + let loaders = Self::load_loaders(checkpoints, tx, error_counters)?; + Ok((accounts, loaders)) + } + (_, Err(e)) => Err(e), + }) + .collect() + } + + pub fn increment_transaction_count(&mut self, tx_count: usize) { + self.transaction_count += tx_count as u64 + } + + pub fn transaction_count(&self) -> u64 { + self.transaction_count + } + + /// become the root accountsDB + fn squash(&mut self, parents: &[U]) + where + U: Deref, + { + self.transaction_count += parents + .iter() + .fold(0, |sum, parent| sum + parent.transaction_count); + + // for every account in all the parents, load latest and update self if + // absent + for pubkey in parents.iter().flat_map(|parent| parent.accounts.keys()) { + // update self with data from parents unless in self + if self.accounts.get(pubkey).is_none() { + self.accounts + .insert(pubkey.clone(), Self::load(parents, pubkey).unwrap().clone()); + } + } + + // toss any zero-balance accounts, since self is root now + self.accounts.retain(|_, account| account.tokens != 0); + } +} + +impl AccountsStore for HashMapAccountsStore { + fn parent(&self) -> Option> { + self.parent.read().unwrap().clone() + } + + /// Slow because lock is held for 1 operation instead of many + pub fn load_slow(&self, fork: Fork, pubkey: &Pubkey) -> Option { + parents: RwLock>>, + accounts: RwLock>>, + let accounts = self.accounts.read().unwrap().get(&fork).unwrap(); + let mut dbs = vec![&account_info.accounts_db.read().unwrap()]; + let mut db_parents = vec![]; + let mut accounts_parent = accounts.parent(); + while let Some(parent) = bank { + parents.push(parent.clone()); + bank = parent.parent(); + } + parents + + let db1: Vec<_> = account_info + .parents + .iter() + .map(|obj| &obj.accounts_db) + .collect(); + dbs.extend(&db1); + return AccountsDB::load(&dbs, pubkey); + } + + /// Slow because lock is held for 1 operation insted of many + /// * purge - if the account token value is 0 and purge is true then delete the account. + /// purge should be set to false for overlays, and true for the root checkpoint. + pub fn store_slow(&self, fork: Fork, purge: bool, pubkey: &Pubkey, account: &Account) { + let info = self.accounts.read().unwrap().get(&fork).unwrap(); + info.accounts + .write() + .unwrap() + .accounts_db + .store(purge, pubkey, account) + } + + fn lock_account( + account_locks: &mut HashSet, + keys: &[Pubkey], + error_counters: &mut ErrorCounters, + ) -> Result<()> { + // Copy all the accounts + for k in keys { + if account_locks.contains(k) { + error_counters.account_in_use += 1; + return Err(BankError::AccountInUse); + } + } + for k in keys { + account_locks.insert(*k); + } + Ok(()) + } + + fn unlock_account(tx: &Transaction, result: &Result<()>, account_locks: &mut HashSet) { + match result { + Err(BankError::AccountInUse) => (), + _ => { + for k in &tx.account_keys { + account_locks.remove(k); + } + } + } + } + + pub fn hash_internal_state(&self, fork: Fork) -> Hash { + let info = self.accounts.read().unwrap().get(&fork).unwrap(); + info.accounts + .read() + .unwrap() + .accounts_db + .hash_internal_state() + } + + /// This function will prevent multiple threads from modifying the same account state at the + /// same time + #[must_use] + pub fn lock_accounts(&self, fork: Fork, txs: &[Transaction]) -> Vec> { + let locks = self.account_locks.read().unwrap().get(&fork).unwrap(); + let mut account_locks = locks.lock().unwrap(); + let mut error_counters = ErrorCounters::default(); + let rv = txs + .iter() + .map(|tx| Self::lock_account(&mut account_locks, &tx.account_keys, &mut error_counters)) + .collect(); + if error_counters.account_in_use != 0 { + inc_new_counter_info!( + "bank-process_transactions-account_in_use", + error_counters.account_in_use + ); + } + rv + } + + /// Once accounts are unlocked, new transactions that modify that state can enter the pipeline + pub fn unlock_accounts(&self, fork: Fork, txs: &[Transaction], results: &[Result<()>]) { + let locks = self.account_locks.read().unwrap().get(&fork).unwrap(); + let mut account_locks = locks.lock().unwrap(); + debug!("bank unlock accounts"); + txs.iter() + .zip(results.iter()) + .for_each(|(tx, result)| Self::unlock_account(tx, result, &mut account_locks)); + } + + pub fn load_accounts( + &self, + fork: Fork, + txs: &[Transaction], + results: Vec>, + error_counters: &mut ErrorCounters, + ) -> Vec> { + let info = self.accounts.read().unwrap().get(&fork).unwrap(); + let account_info = info.read().unwrap(); + let mut dbs = vec![&account_info.accounts_db]; + let db1: Vec<_> = account_info + .parents + .iter() + .map(|obj| &obj.accounts_db) + .collect(); + dbs.extend(&db1); + let dbs: Vec<_> = checkpoints + .iter() + .map(|obj| obj.accounts_db.read().unwrap()) + .collect(); + AccountsDB::load_accounts(&dbs, txs, results, error_counters) + } + + /// Store the accounts into the DB + /// * purge - if the account token value is 0 and purge is true then delete the account. + /// purge should be set to false for overlays, and true for the root checkpoint. + pub fn store_accounts( + &self, + purge: bool, + txs: &[Transaction], + res: &[Result<()>], + loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], + ) { + self.accounts_db + .write() + .unwrap() + .store_accounts(purge, txs, res, loaded) + } + + pub fn increment_transaction_count(&self, tx_count: usize) { + self.accounts_db + .write() + .unwrap() + .increment_transaction_count(tx_count) + } + + pub fn transaction_count(&self) -> u64 { + self.accounts_db.read().unwrap().transaction_count() + } + + /// accounts starts with an empty data structure for every child/fork + /// this function squashes all the parents into this instance + pub fn squash(&self, parents: &[U]) + where + U: Deref, + { + assert!(self.account_locks.lock().unwrap().is_empty()); + + let dbs: Vec<_> = parents + .iter() + .map(|obj| obj.accounts_db.read().unwrap()) + .collect(); + + self.accounts_db.write().unwrap().squash(&dbs); + } +} + +#[cfg(test)] +mod tests { + // TODO: all the bank tests are bank specific, issue: 2194 + + use super::*; + use solana_sdk::account::Account; + use solana_sdk::hash::Hash; + use solana_sdk::signature::Keypair; + use solana_sdk::signature::KeypairUtil; + use solana_sdk::transaction::Instruction; + use solana_sdk::transaction::Transaction; + + #[test] + fn test_purge() { + let mut db = AccountsDB::default(); + let key = Pubkey::default(); + let account = Account::new(0, 0, Pubkey::default()); + // accounts are deleted when their token value is 0 and purge is true + db.store(false, &key, &account); + assert_eq!(AccountsDB::load(&[&db], &key), Some(account.clone())); + // purge should be set to true for the root checkpoint + db.store(true, &key, &account); + assert_eq!(AccountsDB::load(&[&db], &key), None); + } + + fn load_accounts( + tx: Transaction, + ka: &Vec<(Pubkey, Account)>, + error_counters: &mut ErrorCounters, + ) -> Vec> { + let accounts = Accounts::default(); + for ka in ka.iter() { + accounts.store_slow(true, &ka.0, &ka.1); + } + + Accounts::load_accounts(&[&accounts], &[tx], vec![Ok(())], error_counters) + } + + #[test] + fn test_load_accounts_no_key() { + let accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let instructions = vec![Instruction::new(1, &(), vec![0])]; + let tx = Transaction::new_with_instructions::( + &[], + &[], + Hash::default(), + 0, + vec![native_loader::id()], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::AccountNotFound)); + } + + #[test] + fn test_load_accounts_no_account_0_exists() { + let accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + + let instructions = vec![Instruction::new(1, &(), vec![0])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 0, + vec![native_loader::id()], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::AccountNotFound)); + } + + #[test] + fn test_load_accounts_unknown_program_id() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + let key1 = Pubkey::new(&[5u8; 32]); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let account = Account::new(2, 1, Pubkey::default()); + accounts.push((key1, account)); + + let instructions = vec![Instruction::new(1, &(), vec![0])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 0, + vec![Pubkey::default()], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::AccountNotFound)); + } + + #[test] + fn test_load_accounts_insufficient_funds() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let instructions = vec![Instruction::new(1, &(), vec![0])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 10, + vec![native_loader::id()], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.insufficient_funds, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::InsufficientFundsForFee)); + } + + #[test] + fn test_load_accounts_no_loaders() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + let key1 = Pubkey::new(&[5u8; 32]); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let account = Account::new(2, 1, Pubkey::default()); + accounts.push((key1, account)); + + let instructions = vec![Instruction::new(0, &(), vec![0, 1])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[key1], + Hash::default(), + 0, + vec![native_loader::id()], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 0); + assert_eq!(loaded_accounts.len(), 1); + match &loaded_accounts[0] { + Ok((a, l)) => { + assert_eq!(a.len(), 2); + assert_eq!(a[0], accounts[0].1); + assert_eq!(l.len(), 1); + assert_eq!(l[0].len(), 0); + } + Err(e) => Err(e).unwrap(), + } + } + + #[test] + fn test_load_accounts_max_call_depth() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + let key1 = Pubkey::new(&[5u8; 32]); + let key2 = Pubkey::new(&[6u8; 32]); + let key3 = Pubkey::new(&[7u8; 32]); + let key4 = Pubkey::new(&[8u8; 32]); + let key5 = Pubkey::new(&[9u8; 32]); + let key6 = Pubkey::new(&[10u8; 32]); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let mut account = Account::new(40, 1, Pubkey::default()); + account.executable = true; + account.owner = native_loader::id(); + accounts.push((key1, account)); + + let mut account = Account::new(41, 1, Pubkey::default()); + account.executable = true; + account.owner = key1; + accounts.push((key2, account)); + + let mut account = Account::new(42, 1, Pubkey::default()); + account.executable = true; + account.owner = key2; + accounts.push((key3, account)); + + let mut account = Account::new(43, 1, Pubkey::default()); + account.executable = true; + account.owner = key3; + accounts.push((key4, account)); + + let mut account = Account::new(44, 1, Pubkey::default()); + account.executable = true; + account.owner = key4; + accounts.push((key5, account)); + + let mut account = Account::new(45, 1, Pubkey::default()); + account.executable = true; + account.owner = key5; + accounts.push((key6, account)); + + let instructions = vec![Instruction::new(0, &(), vec![0])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 0, + vec![key6], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.call_chain_too_deep, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::CallChainTooDeep)); + } + + #[test] + fn test_load_accounts_bad_program_id() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + let key1 = Pubkey::new(&[5u8; 32]); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let mut account = Account::new(40, 1, Pubkey::default()); + account.executable = true; + account.owner = Pubkey::default(); + accounts.push((key1, account)); + + let instructions = vec![Instruction::new(0, &(), vec![0])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 0, + vec![key1], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::AccountNotFound)); + } + + #[test] + fn test_load_accounts_not_executable() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + let key1 = Pubkey::new(&[5u8; 32]); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let mut account = Account::new(40, 1, Pubkey::default()); + account.owner = native_loader::id(); + accounts.push((key1, account)); + + let instructions = vec![Instruction::new(0, &(), vec![0])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 0, + vec![key1], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::AccountNotFound)); + } + + #[test] + fn test_load_accounts_multiple_loaders() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + let key1 = Pubkey::new(&[5u8; 32]); + let key2 = Pubkey::new(&[6u8; 32]); + let key3 = Pubkey::new(&[7u8; 32]); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let mut account = Account::new(40, 1, Pubkey::default()); + account.executable = true; + account.owner = native_loader::id(); + accounts.push((key1, account)); + + let mut account = Account::new(41, 1, Pubkey::default()); + account.executable = true; + account.owner = key1; + accounts.push((key2, account)); + + let mut account = Account::new(42, 1, Pubkey::default()); + account.executable = true; + account.owner = key2; + accounts.push((key3, account)); + + let instructions = vec![ + Instruction::new(0, &(), vec![0]), + Instruction::new(1, &(), vec![0]), + ]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 0, + vec![key1, key2], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 0); + assert_eq!(loaded_accounts.len(), 1); + match &loaded_accounts[0] { + Ok((a, l)) => { + assert_eq!(a.len(), 1); + assert_eq!(a[0], accounts[0].1); + assert_eq!(l.len(), 2); + assert_eq!(l[0].len(), 1); + assert_eq!(l[1].len(), 2); + for instruction_loaders in l.iter() { + for (i, a) in instruction_loaders.iter().enumerate() { + // +1 to skip first not loader account + assert_eq![a.1, accounts[i + 1].1]; + } + } + } + Err(e) => Err(e).unwrap(), + } + } + + #[test] + fn test_load_account_pay_to_self() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let pubkey = keypair.pubkey(); + + let account = Account::new(10, 1, Pubkey::default()); + accounts.push((pubkey, account)); + + let instructions = vec![Instruction::new(0, &(), vec![0, 1])]; + // Simulate pay-to-self transaction, which loads the same account twice + let tx = Transaction::new_with_instructions( + &[&keypair], + &[pubkey], + Hash::default(), + 0, + vec![native_loader::id()], + instructions, + ); + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_loaded_twice, 1); + assert_eq!(loaded_accounts.len(), 1); + loaded_accounts[0].clone().unwrap_err(); + assert_eq!(loaded_accounts[0], Err(BankError::AccountLoadedTwice)); + } + + #[test] + fn test_accounts_squash() { + let mut db0 = AccountsDB::default(); + let key = Pubkey::default(); + let account = Account::new(1, 0, key); + + // store value 1 in the "root", i.e. db zero + db0.store(true, &key, &account); + + // store value 0 in the child, but don't purge (see purge test above) + let mut db1 = AccountsDB::default(); + db1.store(false, &key, &Account::new(0, 0, key)); + + // squash, which should whack key's account + db1.squash(&[&db0]); + + assert_eq!(AccountsDB::load(&[&db1], &key), None); + } + +} diff --git a/runtime/src/accounts_persist.rs b/runtime/src/accounts_persist.rs new file mode 100644 index 00000000000000..c5b52513f45f11 --- /dev/null +++ b/runtime/src/accounts_persist.rs @@ -0,0 +1,1515 @@ +use crate::appendvec::AppendVec; +use crate::bank::{BankError, Result}; +use crate::runtime::has_duplicates; +use bincode::serialize; +use hashbrown::{HashMap, HashSet}; +use log::debug; +use solana_metrics::counter::Counter; +use solana_sdk::account::Account; +use solana_sdk::hash::{hash, Hash}; +use solana_sdk::native_loader; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::signature::{Keypair, KeypairUtil}; +use solana_sdk::transaction::Transaction; +use solana_sdk::vote_program; +use std::collections::BTreeMap; +use std::env; +use std::fs::{create_dir_all, remove_dir_all}; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex, RwLock}; + +pub type InstructionAccounts = Vec; +pub type InstructionLoaders = Vec>; + +#[derive(Debug, Default)] +pub struct ErrorCounters { + pub account_not_found: usize, + pub account_in_use: usize, + pub account_loaded_twice: usize, + pub last_id_not_found: usize, + pub last_id_too_old: usize, + pub reserve_last_id: usize, + pub insufficient_funds: usize, + pub duplicate_signature: usize, + pub call_chain_too_deep: usize, + pub missing_signature_for_fee: usize, +} + +// +// Persistent accounts are stored in below path location: +// //data/ +// +// Each account is stored in below format: +// +// +// The persistent store would allow for this mode of operation: +// - Concurrent single thread append with many concurrent readers. +// - Exclusive resize or truncate from the start. +// +// The underlying memory is memory mapped to a file. The accounts would be +// stored across multiple files and the mappings of file and offset of a +// particular account would be stored in a shared index. This will allow for +// concurrent commits without blocking reads, which will sequentially write +// to memory, ssd or disk, and should be as fast as the hardware allow for. +// The only required in memory data structure with a write lock is the index, +// which should be fast to update. +// +// To garbage collect, data can be re-appended to defragmnted and truncated from +// the start. The AccountsDB data structure would allow for +// - multiple readers +// - multiple writers +// - persistent backed memory +// +// To bootstrap the index from a persistent store of AppendVec's, the entries should +// also include a "commit counter". A single global atomic that tracks the number +// of commits to the entire data store. So the latest commit for each fork entry +// would be indexed. (TODO) + +const ACCOUNT_DATA_FILE_SIZE: u64 = 64 * 1024 * 1024; +const ACCOUNT_DATA_FILE: &str = "data"; +const ACCOUNTSDB_DIR: &str = "accountsdb"; +const NUM_ACCOUNT_DIRS: usize = 4; + +/// An offset into the AccountsDB::storage vector +type AppendVecId = usize; + +type Fork = u64; + +struct AccountMap(RwLock>); + +#[derive(Debug, PartialEq)] +enum AccountStorageStatus { + StorageAvailable = 0, + StorageFull = 1, +} + +impl From for AccountStorageStatus { + fn from(status: usize) -> Self { + use self::AccountStorageStatus::*; + match status { + 0 => StorageAvailable, + 1 => StorageFull, + _ => unreachable!(), + } + } +} + +struct AccountIndexInfo { + /// For each Pubkey, the account for a specific fork is in a specific + /// AppendVec at a specific index + index: RwLock>, + + /// Cached index to vote accounts for performance reasons to avoid having + /// to iterate through the entire accounts each time + vote_index: RwLock>, +} + +/// Persistent storage structure holding the accounts +struct AccountStorage { + /// storage holding the accounts + appendvec: Arc>>, + + /// Keeps track of the number of accounts stored in a specific AppendVec. + /// This is periodically checked to reuse the stores that do not have + /// any accounts in it. + count: AtomicUsize, + + /// status corresponding to the storage + status: AtomicUsize, + + /// Path to the persistent store + path: String, +} + +impl AccountStorage { + pub fn set_status(&self, status: AccountStorageStatus) { + self.status.store(status as usize, Ordering::Relaxed); + } + + pub fn get_status(&self) -> AccountStorageStatus { + self.status.load(Ordering::Relaxed).into() + } +} + +#[derive(Default)] +struct AccountsForkInfo { + /// The number of transactions the bank has processed without error since the + /// start of the ledger. + transaction_count: u64, + + /// List of all parents corresponding to this fork + parents: Vec, +} + +// This structure handles the load/store of the accounts +struct AccountsDB { + /// Keeps tracks of index into AppendVec on a per fork basis + index_info: AccountIndexInfo, + + /// Account storage + storage: RwLock>, + + /// distribute the accounts across storage lists + next_id: AtomicUsize, + + /// Information related to the fork + fork_info: RwLock>, +} + +/// This structure handles synchronization for db +pub struct Accounts { + accounts_db: AccountsDB, + + /// set of accounts which are currently in the pipeline + account_locks: Mutex>>, + + /// List of persistent stores + paths: String, +} + +fn get_paths_vec(paths: &str) -> Vec { + paths.split(',').map(|s| s.to_string()).collect() +} + +fn cleanup_dirs(paths: &str) { + let paths = get_paths_vec(&paths); + paths.iter().for_each(|p| { + let _ignored = remove_dir_all(p); + let path = Path::new(p); + let _ignored = remove_dir_all(path.parent().unwrap()); + }); +} + +impl Drop for Accounts { + fn drop(&mut self) { + cleanup_dirs(&self.paths); + } +} + +impl AccountsDB { + pub fn new(fork: Fork, paths: &str) -> Self { + let index_info = AccountIndexInfo { + index: RwLock::new(HashMap::new()), + vote_index: RwLock::new(HashSet::new()), + }; + let accounts_db = AccountsDB { + index_info, + storage: RwLock::new(vec![]), + next_id: AtomicUsize::new(0), + fork_info: RwLock::new(HashMap::new()), + }; + accounts_db.add_storage(paths); + accounts_db.add_fork(fork, None); + accounts_db + } + + pub fn add_fork(&self, fork: Fork, parent: Option) { + let mut info = self.fork_info.write().unwrap(); + let mut fork_info = AccountsForkInfo::default(); + if parent.is_some() { + fork_info.parents.push(parent.unwrap()); + if let Some(list) = info.get(&parent.unwrap()) { + fork_info.parents.extend_from_slice(&list.parents); + } + } + info.insert(fork, fork_info); + } + + fn add_storage(&self, paths: &str) { + let paths = get_paths_vec(&paths); + let mut stores: Vec = vec![]; + paths.iter().for_each(|p| { + let storage = AccountStorage { + appendvec: self.new_account_storage(&p), + status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize), + count: AtomicUsize::new(0), + path: p.to_string(), + }; + stores.push(storage); + }); + let mut storage = self.storage.write().unwrap(); + storage.append(&mut stores); + } + + fn new_account_storage(&self, p: &str) -> Arc>> { + let id = self.next_id.fetch_add(1, Ordering::Relaxed); + let p = format!("{}/{}", p, id); + let path = Path::new(&p); + let _ignored = remove_dir_all(path); + create_dir_all(path).expect("Create directory failed"); + Arc::new(RwLock::new(AppendVec::::new( + &path.join(ACCOUNT_DATA_FILE), + true, + ACCOUNT_DATA_FILE_SIZE, + 0, + ))) + } + + pub fn get_vote_accounts(&self, fork: Fork) -> Vec { + let mut accounts: Vec = vec![]; + self.index_info + .vote_index + .read() + .unwrap() + .iter() + .for_each(|p| { + if let Some(map) = self.index_info.index.read().unwrap().get(p) { + let forks = map.0.read().unwrap(); + if let Some((id, offset)) = forks.get(&fork) { + accounts.push(self.get_account(*id, *offset)); + } else { + let fork_info = self.fork_info.read().unwrap(); + if let Some(info) = fork_info.get(&fork) { + for parent_fork in info.parents.iter() { + if let Some((id, offset)) = forks.get(&parent_fork) { + accounts.push(self.get_account(*id, *offset)); + } + } + } + } + } + }); + accounts + } + + pub fn has_accounts(&self, fork: Fork) -> bool { + let index = self.index_info.index.read().unwrap(); + + for entry in index.values() { + let account_map = entry.0.read().unwrap(); + if account_map.contains_key(&fork) { + return true; + } + } + false + } + + pub fn hash_internal_state(&self, fork: Fork) -> Option { + let mut ordered_accounts = BTreeMap::new(); + let rindex = self.index_info.index.read().unwrap(); + rindex.iter().for_each(|(p, entry)| { + let forks = entry.0.read().unwrap(); + if let Some((id, index)) = forks.get(&fork) { + let account = self.storage.read().unwrap()[*id] + .appendvec + .read() + .unwrap() + .get_account(*index) + .unwrap(); + ordered_accounts.insert(*p, account); + } + }); + + if ordered_accounts.is_empty() { + return None; + } + Some(hash(&serialize(&ordered_accounts).unwrap())) + } + + fn get_account(&self, id: AppendVecId, offset: u64) -> Account { + let appendvec = &self.storage.read().unwrap()[id].appendvec; + let av = appendvec.read().unwrap(); + av.get_account(offset).unwrap() + } + + fn load(&self, fork: Fork, pubkey: &Pubkey, walk_back: bool) -> Option { + let index = self.index_info.index.read().unwrap(); + if let Some(map) = index.get(pubkey) { + let forks = map.0.read().unwrap(); + // find most recent fork that is an ancestor of current_fork + if let Some((id, offset)) = forks.get(&fork) { + return Some(self.get_account(*id, *offset)); + } else { + if !walk_back { + return None; + } + let fork_info = self.fork_info.read().unwrap(); + if let Some(info) = fork_info.get(&fork) { + for parent_fork in info.parents.iter() { + if let Some((id, offset)) = forks.get(&parent_fork) { + return Some(self.get_account(*id, *offset)); + } + } + } + } + } + None + } + + fn get_storage_id(&self, start: usize, current: usize) -> usize { + let mut id = current; + let len: usize; + { + let stores = self.storage.read().unwrap(); + len = stores.len(); + if id == std::usize::MAX { + id = start % len; + if stores[id].get_status() == AccountStorageStatus::StorageAvailable { + return id; + } + } else { + stores[id].set_status(AccountStorageStatus::StorageFull); + } + + loop { + id = (id + 1) % len; + if stores[id].get_status() == AccountStorageStatus::StorageAvailable { + break; + } + if id == start % len { + break; + } + } + } + if id == start % len { + let mut stores = self.storage.write().unwrap(); + // check if new store was already created + if stores.len() == len { + let storage = AccountStorage { + appendvec: self.new_account_storage(&stores[id].path), + count: AtomicUsize::new(0), + status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize), + path: stores[id].path.clone(), + }; + stores.push(storage); + } + id = stores.len() - 1; + } + id + } + + fn append_account(&self, account: &Account) -> (usize, u64) { + let offset: u64; + let start = self.next_id.fetch_add(1, Ordering::Relaxed); + let mut id = self.get_storage_id(start, std::usize::MAX); + let mut acc = &Account::default(); + loop { + let result: Option; + { + if account.tokens != 0 { + acc = account; + } + let av = &self.storage.read().unwrap()[id].appendvec; + result = av.read().unwrap().append_account(&acc); + } + if let Some(val) = result { + offset = val; + break; + } else { + id = self.get_storage_id(start, id); + } + } + (id, offset) + } + + fn remove_account_entries(&self, entries: &[Fork], map: &AccountMap) -> bool { + let mut forks = map.0.write().unwrap(); + for fork in entries.iter() { + if let Some((id, _)) = forks.remove(&fork) { + let stores = self.storage.read().unwrap(); + if stores[id].count.fetch_sub(1, Ordering::Relaxed) == 1 { + stores[id].appendvec.write().unwrap().reset(); + stores[id].set_status(AccountStorageStatus::StorageAvailable); + } + } + } + forks.is_empty() + } + + fn insert_account_entry(&self, fork: Fork, id: AppendVecId, offset: u64, map: &AccountMap) { + let mut forks = map.0.write().unwrap(); + let stores = self.storage.read().unwrap(); + stores[id].count.fetch_add(1, Ordering::Relaxed); + if let Some((old_id, _)) = forks.insert(fork, (id, offset)) { + if stores[old_id].count.fetch_sub(1, Ordering::Relaxed) == 1 { + stores[old_id].appendvec.write().unwrap().reset(); + stores[old_id].set_status(AccountStorageStatus::StorageAvailable); + } + } + } + + /// Store the account update. If the update is to delete the account because + /// the token balance is 0, purge needs to be set to true for the delete + /// to occur in place. + fn store_account(&self, fork: Fork, purge: bool, pubkey: &Pubkey, account: &Account) { + if account.tokens == 0 && purge { + // purge if balance is 0 and no checkpoints + let index = self.index_info.index.read().unwrap(); + let map = index.get(&pubkey).unwrap(); + self.remove_account_entries(&[fork], &map); + if vote_program::check_id(&account.owner) { + self.index_info.vote_index.write().unwrap().remove(pubkey); + } + } else { + let (id, offset) = self.append_account(&account); + + if vote_program::check_id(&account.owner) { + let mut index = self.index_info.vote_index.write().unwrap(); + if account.tokens == 0 { + index.remove(pubkey); + } else { + index.insert(*pubkey); + } + } + + let index = self.index_info.index.read().unwrap(); + let map = index.get(&pubkey).unwrap(); + self.insert_account_entry(fork, id, offset, &map); + } + } + + pub fn store(&self, fork: Fork, purge: bool, pubkey: &Pubkey, account: &Account) { + { + if !self.index_info.index.read().unwrap().contains_key(&pubkey) { + let mut windex = self.index_info.index.write().unwrap(); + windex.insert(*pubkey, AccountMap(RwLock::new(HashMap::new()))); + } + } + self.store_account(fork, purge, pubkey, account); + } + + pub fn store_accounts( + &self, + fork: Fork, + purge: bool, + txs: &[Transaction], + res: &[Result<()>], + loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], + ) { + let mut keys = vec![]; + { + let index = self.index_info.index.read().unwrap(); + for (i, raccs) in loaded.iter().enumerate() { + if res[i].is_err() || raccs.is_err() { + continue; + } + let tx = &txs[i]; + for key in tx.account_keys.iter() { + if !index.contains_key(&key) { + keys.push(*key); + } + } + } + } + if !keys.is_empty() { + let mut index = self.index_info.index.write().unwrap(); + for key in keys.iter() { + index.insert(*key, AccountMap(RwLock::new(HashMap::new()))); + } + } + for (i, raccs) in loaded.iter().enumerate() { + if res[i].is_err() || raccs.is_err() { + continue; + } + + let tx = &txs[i]; + let acc = raccs.as_ref().unwrap(); + for (key, account) in tx.account_keys.iter().zip(acc.0.iter()) { + self.store(fork, purge, key, account); + } + } + } + + fn load_tx_accounts( + &self, + fork: Fork, + tx: &Transaction, + error_counters: &mut ErrorCounters, + ) -> Result> { + // Copy all the accounts + if tx.signatures.is_empty() && tx.fee != 0 { + Err(BankError::MissingSignatureForFee) + } else { + // Check for unique account keys + if has_duplicates(&tx.account_keys) { + error_counters.account_loaded_twice += 1; + return Err(BankError::AccountLoadedTwice); + } + + // There is no way to predict what program will execute without an error + // If a fee can pay for execution then the program will be scheduled + let mut called_accounts: Vec = vec![]; + for key in &tx.account_keys { + called_accounts.push(self.load(fork, key, true).unwrap_or_default()); + } + if called_accounts.is_empty() || called_accounts[0].tokens == 0 { + error_counters.account_not_found += 1; + Err(BankError::AccountNotFound) + } else if called_accounts[0].tokens < tx.fee { + error_counters.insufficient_funds += 1; + Err(BankError::InsufficientFundsForFee) + } else { + called_accounts[0].tokens -= tx.fee; + Ok(called_accounts) + } + } + } + + fn load_executable_accounts( + &self, + fork: Fork, + mut program_id: Pubkey, + error_counters: &mut ErrorCounters, + ) -> Result> { + let mut accounts = Vec::new(); + let mut depth = 0; + loop { + if native_loader::check_id(&program_id) { + // at the root of the chain, ready to dispatch + break; + } + + if depth >= 5 { + error_counters.call_chain_too_deep += 1; + return Err(BankError::CallChainTooDeep); + } + depth += 1; + + let program = match self.load(fork, &program_id, true) { + Some(program) => program, + None => { + error_counters.account_not_found += 1; + return Err(BankError::AccountNotFound); + } + }; + if !program.executable || program.owner == Pubkey::default() { + error_counters.account_not_found += 1; + return Err(BankError::AccountNotFound); + } + + // add loader to chain + accounts.insert(0, (program_id, program.clone())); + + program_id = program.owner; + } + Ok(accounts) + } + + /// For each program_id in the transaction, load its loaders. + fn load_loaders( + &self, + fork: Fork, + tx: &Transaction, + error_counters: &mut ErrorCounters, + ) -> Result>> { + tx.instructions + .iter() + .map(|ix| { + if tx.program_ids.len() <= ix.program_ids_index as usize { + error_counters.account_not_found += 1; + return Err(BankError::AccountNotFound); + } + let program_id = tx.program_ids[ix.program_ids_index as usize]; + self.load_executable_accounts(fork, program_id, error_counters) + }) + .collect() + } + + fn load_accounts( + &self, + fork: Fork, + txs: &[Transaction], + lock_results: Vec>, + error_counters: &mut ErrorCounters, + ) -> Vec> { + txs.iter() + .zip(lock_results.into_iter()) + .map(|etx| match etx { + (tx, Ok(())) => { + let accounts = self.load_tx_accounts(fork, tx, error_counters)?; + let loaders = self.load_loaders(fork, tx, error_counters)?; + Ok((accounts, loaders)) + } + (_, Err(e)) => Err(e), + }) + .collect() + } + + pub fn increment_transaction_count(&self, fork: Fork, tx_count: usize) { + let mut info = self.fork_info.write().unwrap(); + let entry = info.entry(fork).or_insert(AccountsForkInfo::default()); + entry.transaction_count += tx_count as u64; + } + + pub fn transaction_count(&self, fork: Fork) -> u64 { + let info = self.fork_info.read().unwrap(); + if let Some(entry) = info.get(&fork) { + entry.transaction_count + } else { + 0 + } + } + + fn remove_parents(&self, fork: Fork) -> Vec { + let mut info = self.fork_info.write().unwrap(); + let fork_info = info.get_mut(&fork).unwrap(); + let parents = fork_info.parents.split_off(0); + info.retain(|&f, _| !parents.contains(&f)); + parents + } + + fn get_merged_index( + &self, + fork: Fork, + parents: &[Fork], + map: &AccountMap, + ) -> Option<(Fork, AppendVecId, u64)> { + let forks = map.0.read().unwrap(); + if let Some((id, offset)) = forks.get(&fork) { + return Some((fork, *id, *offset)); + } else { + for parent_fork in parents.iter() { + if let Some((id, offset)) = forks.get(parent_fork) { + return Some((*parent_fork, *id, *offset)); + } + } + } + None + } + + /// become the root accountsDB + fn squash(&self, fork: Fork) { + let parents = self.remove_parents(fork); + let tx_count = parents + .iter() + .fold(0, |sum, parent| sum + self.transaction_count(*parent)); + self.increment_transaction_count(fork, tx_count as usize); + + // for every account in all the parents, load latest and update self if + // absent + let mut keys = vec![]; + { + let index = self.index_info.index.read().unwrap(); + index.iter().for_each(|(pubkey, map)| { + if let Some((parent_fork, id, offset)) = self.get_merged_index(fork, &parents, &map) + { + if parent_fork != fork { + self.insert_account_entry(fork, id, offset, &map); + if self.remove_account_entries(&parents, &map) { + keys.push(pubkey.clone()); + } + } + let account = self.get_account(id, offset); + if account.tokens == 0 { + if self.remove_account_entries(&[fork], &map) { + keys.push(pubkey.clone()); + } + if vote_program::check_id(&account.owner) { + self.index_info.vote_index.write().unwrap().remove(pubkey); + } + } else { + } + } + }); + } + if !keys.is_empty() { + let mut index = self.index_info.index.write().unwrap(); + for key in keys.iter() { + index.remove(&key); + } + } + } +} + +impl Accounts { + fn make_new_dir() -> String { + static ACCOUNT_DIR: AtomicUsize = AtomicUsize::new(0); + let dir = ACCOUNT_DIR.fetch_add(1, Ordering::Relaxed); + let out_dir = env::var("OUT_DIR").unwrap_or_else(|_| "target".to_string()); + let keypair = Keypair::new(); + format!( + "{}/{}/{}/{}", + out_dir, + ACCOUNTSDB_DIR, + keypair.pubkey(), + dir.to_string() + ) + } + + fn make_default_paths() -> String { + let mut paths = "".to_string(); + for index in 0..NUM_ACCOUNT_DIRS { + if index > 0 { + paths.push_str(","); + } + paths.push_str(&Self::make_new_dir()); + } + paths + } + + pub fn new(fork: Fork, in_paths: Option) -> Self { + let paths = if in_paths.is_none() { + Self::make_default_paths() + } else { + in_paths.unwrap() + }; + let accounts_db = AccountsDB::new(fork, &paths); + accounts_db.add_fork(fork, None); + Accounts { + accounts_db, + account_locks: Mutex::new(HashMap::new()), + paths, + } + } + + pub fn new_from_parent(&self, fork: Fork, parent: Fork) { + self.accounts_db.add_fork(fork, Some(parent)); + } + + /// Slow because lock is held for 1 operation insted of many + pub fn load_slow(&self, fork: Fork, pubkey: &Pubkey) -> Option { + self.accounts_db.load(fork, pubkey, true) + } + + /// Slow because lock is held for 1 operation insted of many + pub fn load_slow_no_parent(&self, fork: Fork, pubkey: &Pubkey) -> Option { + self.accounts_db.load(fork, pubkey, false) + } + + /// Slow because lock is held for 1 operation insted of many + /// * purge - if the account token value is 0 and purge is true then delete the account. + /// purge should be set to false for overlays, and true for the root checkpoint. + pub fn store_slow(&self, fork: Fork, purge: bool, pubkey: &Pubkey, account: &Account) { + self.accounts_db.store(fork, purge, pubkey, account); + } + + fn lock_account( + fork: Fork, + account_locks: &mut HashMap>, + keys: &[Pubkey], + error_counters: &mut ErrorCounters, + ) -> Result<()> { + // Copy all the accounts + let locks = account_locks.entry(fork).or_insert(HashSet::new()); + for k in keys { + if locks.contains(k) { + error_counters.account_in_use += 1; + return Err(BankError::AccountInUse); + } + } + for k in keys { + locks.insert(*k); + } + Ok(()) + } + + fn unlock_account( + fork: Fork, + tx: &Transaction, + result: &Result<()>, + account_locks: &mut HashMap>, + ) { + match result { + Err(BankError::AccountInUse) => (), + _ => { + if let Some(locks) = account_locks.get_mut(&fork) { + for k in &tx.account_keys { + locks.remove(k); + } + if locks.is_empty() { + account_locks.remove(&fork); + } + } + } + } + } + + pub fn hash_internal_state(&self, fork: Fork) -> Option { + self.accounts_db.hash_internal_state(fork) + } + + /// This function will prevent multiple threads from modifying the same account state at the + /// same time + #[must_use] + pub fn lock_accounts(&self, fork: Fork, txs: &[Transaction]) -> Vec> { + let mut account_locks = self.account_locks.lock().unwrap(); + let mut error_counters = ErrorCounters::default(); + let rv = txs + .iter() + .map(|tx| { + Self::lock_account( + fork, + &mut account_locks, + &tx.account_keys, + &mut error_counters, + ) + }) + .collect(); + if error_counters.account_in_use != 0 { + inc_new_counter_info!( + "bank-process_transactions-account_in_use", + error_counters.account_in_use + ); + } + rv + } + + /// Once accounts are unlocked, new transactions that modify that state can enter the pipeline + pub fn unlock_accounts(&self, fork: Fork, txs: &[Transaction], results: &[Result<()>]) { + let mut account_locks = self.account_locks.lock().unwrap(); + debug!("bank unlock accounts"); + txs.iter() + .zip(results.iter()) + .for_each(|(tx, result)| Self::unlock_account(fork, tx, result, &mut account_locks)); + } + + pub fn has_accounts(&self, fork: Fork) -> bool { + self.accounts_db.has_accounts(fork) + } + + pub fn load_accounts( + &self, + fork: Fork, + txs: &[Transaction], + results: Vec>, + error_counters: &mut ErrorCounters, + ) -> Vec> { + self.accounts_db + .load_accounts(fork, txs, results, error_counters) + } + + /// Store the accounts into the DB + /// * purge - if the account token value is 0 and purge is true then delete the account. + /// purge should be set to false for overlays, and true for the root checkpoint. + pub fn store_accounts( + &self, + fork: Fork, + purge: bool, + txs: &[Transaction], + res: &[Result<()>], + loaded: &[Result<(InstructionAccounts, InstructionLoaders)>], + ) { + self.accounts_db + .store_accounts(fork, purge, txs, res, loaded) + } + + pub fn increment_transaction_count(&self, fork: Fork, tx_count: usize) { + self.accounts_db.increment_transaction_count(fork, tx_count) + } + + pub fn transaction_count(&self, fork: Fork) -> u64 { + self.accounts_db.transaction_count(fork) + } + + /// accounts starts with an empty data structure for every child/fork + /// this function squashes all the parents into this instance + pub fn squash(&self, fork: Fork) { + assert!(!self.account_locks.lock().unwrap().contains_key(&fork)); + self.accounts_db.squash(fork); + } +} + +#[cfg(test)] +mod tests { + // TODO: all the bank tests are bank specific, issue: 2194 + + use super::*; + use rand::{thread_rng, Rng}; + use solana_sdk::account::Account; + use solana_sdk::hash::Hash; + use solana_sdk::signature::Keypair; + use solana_sdk::signature::KeypairUtil; + use solana_sdk::transaction::Instruction; + use solana_sdk::transaction::Transaction; + + #[test] + fn test_purge() { + let paths = "purge".to_string(); + let db = AccountsDB::new(0, &paths); + let key = Pubkey::default(); + let account = Account::new(0, 0, Pubkey::default()); + // accounts are deleted when their token value is 0 and purge is true + db.store(0, false, &key, &account); + assert_eq!(db.load(0, &key, true), Some(account.clone())); + // purge should be set to true for the root checkpoint + db.store(0, true, &key, &account); + assert_eq!(db.load(0, &key, true), None); + cleanup_dirs(&paths); + } + + fn load_accounts( + tx: Transaction, + ka: &Vec<(Pubkey, Account)>, + error_counters: &mut ErrorCounters, + ) -> Vec> { + let accounts = Accounts::new(0, None); + for ka in ka.iter() { + accounts.store_slow(0, true, &ka.0, &ka.1); + } + + let res = accounts.load_accounts(0, &[tx], vec![Ok(())], error_counters); + res + } + + #[test] + fn test_load_accounts_no_key() { + let accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let instructions = vec![Instruction::new(1, &(), vec![0])]; + let tx = Transaction::new_with_instructions::( + &[], + &[], + Hash::default(), + 0, + vec![native_loader::id()], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::AccountNotFound)); + } + + #[test] + fn test_load_accounts_no_account_0_exists() { + let accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + + let instructions = vec![Instruction::new(1, &(), vec![0])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 0, + vec![native_loader::id()], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::AccountNotFound)); + } + + #[test] + fn test_load_accounts_unknown_program_id() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + let key1 = Pubkey::new(&[5u8; 32]); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let account = Account::new(2, 1, Pubkey::default()); + accounts.push((key1, account)); + + let instructions = vec![Instruction::new(1, &(), vec![0])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 0, + vec![Pubkey::default()], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::AccountNotFound)); + } + + #[test] + fn test_load_accounts_insufficient_funds() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let instructions = vec![Instruction::new(1, &(), vec![0])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 10, + vec![native_loader::id()], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.insufficient_funds, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::InsufficientFundsForFee)); + } + + #[test] + fn test_load_accounts_no_loaders() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + let key1 = Pubkey::new(&[5u8; 32]); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let account = Account::new(2, 1, Pubkey::default()); + accounts.push((key1, account)); + + let instructions = vec![Instruction::new(0, &(), vec![0, 1])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[key1], + Hash::default(), + 0, + vec![native_loader::id()], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 0); + assert_eq!(loaded_accounts.len(), 1); + match &loaded_accounts[0] { + Ok((a, l)) => { + assert_eq!(a.len(), 2); + assert_eq!(a[0], accounts[0].1); + assert_eq!(l.len(), 1); + assert_eq!(l[0].len(), 0); + } + Err(e) => Err(e).unwrap(), + } + } + + #[test] + fn test_load_accounts_max_call_depth() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + let key1 = Pubkey::new(&[5u8; 32]); + let key2 = Pubkey::new(&[6u8; 32]); + let key3 = Pubkey::new(&[7u8; 32]); + let key4 = Pubkey::new(&[8u8; 32]); + let key5 = Pubkey::new(&[9u8; 32]); + let key6 = Pubkey::new(&[10u8; 32]); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let mut account = Account::new(40, 1, Pubkey::default()); + account.executable = true; + account.owner = native_loader::id(); + accounts.push((key1, account)); + + let mut account = Account::new(41, 1, Pubkey::default()); + account.executable = true; + account.owner = key1; + accounts.push((key2, account)); + + let mut account = Account::new(42, 1, Pubkey::default()); + account.executable = true; + account.owner = key2; + accounts.push((key3, account)); + + let mut account = Account::new(43, 1, Pubkey::default()); + account.executable = true; + account.owner = key3; + accounts.push((key4, account)); + + let mut account = Account::new(44, 1, Pubkey::default()); + account.executable = true; + account.owner = key4; + accounts.push((key5, account)); + + let mut account = Account::new(45, 1, Pubkey::default()); + account.executable = true; + account.owner = key5; + accounts.push((key6, account)); + + let instructions = vec![Instruction::new(0, &(), vec![0])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 0, + vec![key6], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.call_chain_too_deep, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::CallChainTooDeep)); + } + + #[test] + fn test_load_accounts_bad_program_id() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + let key1 = Pubkey::new(&[5u8; 32]); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let mut account = Account::new(40, 1, Pubkey::default()); + account.executable = true; + account.owner = Pubkey::default(); + accounts.push((key1, account)); + + let instructions = vec![Instruction::new(0, &(), vec![0])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 0, + vec![key1], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::AccountNotFound)); + } + + #[test] + fn test_load_accounts_not_executable() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + let key1 = Pubkey::new(&[5u8; 32]); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let mut account = Account::new(40, 1, Pubkey::default()); + account.owner = native_loader::id(); + accounts.push((key1, account)); + + let instructions = vec![Instruction::new(0, &(), vec![0])]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 0, + vec![key1], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 1); + assert_eq!(loaded_accounts.len(), 1); + assert_eq!(loaded_accounts[0], Err(BankError::AccountNotFound)); + } + + #[test] + fn test_load_accounts_multiple_loaders() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let key0 = keypair.pubkey(); + let key1 = Pubkey::new(&[5u8; 32]); + let key2 = Pubkey::new(&[6u8; 32]); + let key3 = Pubkey::new(&[7u8; 32]); + + let account = Account::new(1, 1, Pubkey::default()); + accounts.push((key0, account)); + + let mut account = Account::new(40, 1, Pubkey::default()); + account.executable = true; + account.owner = native_loader::id(); + accounts.push((key1, account)); + + let mut account = Account::new(41, 1, Pubkey::default()); + account.executable = true; + account.owner = key1; + accounts.push((key2, account)); + + let mut account = Account::new(42, 1, Pubkey::default()); + account.executable = true; + account.owner = key2; + accounts.push((key3, account)); + + let instructions = vec![ + Instruction::new(0, &(), vec![0]), + Instruction::new(1, &(), vec![0]), + ]; + let tx = Transaction::new_with_instructions( + &[&keypair], + &[], + Hash::default(), + 0, + vec![key1, key2], + instructions, + ); + + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_not_found, 0); + assert_eq!(loaded_accounts.len(), 1); + match &loaded_accounts[0] { + Ok((a, l)) => { + assert_eq!(a.len(), 1); + assert_eq!(a[0], accounts[0].1); + assert_eq!(l.len(), 2); + assert_eq!(l[0].len(), 1); + assert_eq!(l[1].len(), 2); + for instruction_loaders in l.iter() { + for (i, a) in instruction_loaders.iter().enumerate() { + // +1 to skip first not loader account + assert_eq![a.1, accounts[i + 1].1]; + } + } + } + Err(e) => Err(e).unwrap(), + } + } + + #[test] + fn test_load_account_pay_to_self() { + let mut accounts: Vec<(Pubkey, Account)> = Vec::new(); + let mut error_counters = ErrorCounters::default(); + + let keypair = Keypair::new(); + let pubkey = keypair.pubkey(); + + let account = Account::new(10, 1, Pubkey::default()); + accounts.push((pubkey, account)); + + let instructions = vec![Instruction::new(0, &(), vec![0, 1])]; + // Simulate pay-to-self transaction, which loads the same account twice + let tx = Transaction::new_with_instructions( + &[&keypair], + &[pubkey], + Hash::default(), + 0, + vec![native_loader::id()], + instructions, + ); + let loaded_accounts = load_accounts(tx, &accounts, &mut error_counters); + + assert_eq!(error_counters.account_loaded_twice, 1); + assert_eq!(loaded_accounts.len(), 1); + loaded_accounts[0].clone().unwrap_err(); + assert_eq!(loaded_accounts[0], Err(BankError::AccountLoadedTwice)); + } + + #[test] + fn test_accounts_squash() { + let paths = "merge".to_string(); + let db = AccountsDB::new(0, &paths); + let key = Pubkey::default(); + let account = Account::new(1, 0, key); + + // store value 1 in the "root", i.e. db zero + db.store(0, true, &key, &account); + + let mut pubkeys: Vec = vec![]; + create_account(&db, &mut pubkeys, 100, 0); + for _ in 1..100 { + let idx = thread_rng().gen_range(0, 99); + let account = db.load(0, &pubkeys[idx], true).unwrap(); + let mut default_account = Account::default(); + default_account.tokens = (idx + 1) as u64; + assert_eq!(compare_account(&default_account, &account), true); + } + db.add_fork(1, Some(0)); + // store value 0 in the child, but don't purge (see purge test above) + db.store(1, false, &key, &Account::new(0, 0, key)); + + // merge, which should whack key's account + db.squash(1); + + assert_eq!(db.load(1, &key, true), None); + for _ in 1..100 { + let idx = thread_rng().gen_range(0, 99); + assert_eq!(db.load(0, &pubkeys[idx], true), None); + let account = db.load(1, &pubkeys[idx], true).unwrap(); + let mut default_account = Account::default(); + default_account.tokens = (idx + 1) as u64; + assert_eq!(compare_account(&default_account, &account), true); + } + cleanup_dirs(&paths); + } + + fn create_account( + accounts: &AccountsDB, + pubkeys: &mut Vec, + num: usize, + num_vote: usize, + ) { + let mut nvote = num_vote; + for t in 0..num { + let pubkey = Keypair::new().pubkey(); + let mut default_account = Account::default(); + pubkeys.push(pubkey.clone()); + default_account.tokens = (t + 1) as u64; + if nvote > 0 && (t + 1) % nvote == 0 { + default_account.owner = vote_program::id(); + nvote -= 1; + } + assert!(accounts.load(0, &pubkey, true).is_none()); + accounts.store(0, true, &pubkey, &default_account); + } + } + + fn update_accounts(accounts: &AccountsDB, pubkeys: Vec, range: usize) { + for _ in 1..1000 { + let idx = thread_rng().gen_range(0, range); + if let Some(mut account) = accounts.load(0, &pubkeys[idx], true) { + account.tokens = account.tokens + 1; + accounts.store(0, true, &pubkeys[idx], &account); + if account.tokens == 0 { + assert!(accounts.load(0, &pubkeys[idx], true).is_none()); + } else { + let mut default_account = Account::default(); + default_account.tokens = account.tokens; + assert_eq!(compare_account(&default_account, &account), true); + } + } + } + } + + fn compare_account(account1: &Account, account2: &Account) -> bool { + if account1.userdata != account2.userdata + || account1.owner != account2.owner + || account1.executable != account2.executable + || account1.tokens != account2.tokens + { + return false; + } + true + } + + #[test] + fn test_account_one() { + let paths = "one".to_string(); + let accounts = AccountsDB::new(0, &paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 1, 0); + let account = accounts.load(0, &pubkeys[0], true).unwrap(); + let mut default_account = Account::default(); + default_account.tokens = 1; + assert_eq!(compare_account(&default_account, &account), true); + cleanup_dirs(&paths); + } + + #[test] + fn test_account_many() { + let paths = "many0,many1".to_string(); + let accounts = AccountsDB::new(0, &paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 100, 0); + for _ in 1..100 { + let idx = thread_rng().gen_range(0, 99); + let account = accounts.load(0, &pubkeys[idx], true).unwrap(); + let mut default_account = Account::default(); + default_account.tokens = (idx + 1) as u64; + assert_eq!(compare_account(&default_account, &account), true); + } + cleanup_dirs(&paths); + } + + #[test] + fn test_account_update() { + let paths = "update0".to_string(); + let accounts = AccountsDB::new(0, &paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 100, 0); + update_accounts(&accounts, pubkeys, 99); + { + let stores = accounts.storage.read().unwrap(); + assert_eq!(stores.len(), 1); + assert_eq!(stores[0].count.load(Ordering::Relaxed), 100); + assert_eq!( + stores[0].get_status(), + AccountStorageStatus::StorageAvailable + ); + } + cleanup_dirs(&paths); + } + + #[test] + fn test_account_grow() { + let paths = "grow0".to_string(); + let accounts = AccountsDB::new(0, &paths); + let count = [0, 1]; + let status = [ + AccountStorageStatus::StorageAvailable, + AccountStorageStatus::StorageFull, + ]; + let pubkey1 = Keypair::new().pubkey(); + let account1 = Account::new(1, ACCOUNT_DATA_FILE_SIZE as usize / 2, pubkey1); + accounts.store(0, true, &pubkey1, &account1); + { + let stores = accounts.storage.read().unwrap(); + assert_eq!(stores.len(), 1); + assert_eq!(stores[0].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[0].get_status(), status[0]); + } + + let pubkey2 = Keypair::new().pubkey(); + let account2 = Account::new(1, ACCOUNT_DATA_FILE_SIZE as usize / 2, pubkey2); + accounts.store(0, true, &pubkey2, &account2); + { + let stores = accounts.storage.read().unwrap(); + assert_eq!(stores.len(), 2); + assert_eq!(stores[0].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[0].get_status(), status[1]); + assert_eq!(stores[1].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[1].get_status(), status[0]); + } + assert_eq!(accounts.load(0, &pubkey1, true).unwrap(), account1); + assert_eq!(accounts.load(0, &pubkey2, true).unwrap(), account2); + + for i in 0..25 { + let index = i % 2; + accounts.store(0, true, &pubkey1, &account1); + { + let stores = accounts.storage.read().unwrap(); + assert_eq!(stores.len(), 3); + assert_eq!(stores[0].count.load(Ordering::Relaxed), count[index]); + assert_eq!(stores[0].get_status(), status[0]); + assert_eq!(stores[1].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[1].get_status(), status[1]); + assert_eq!(stores[2].count.load(Ordering::Relaxed), count[index ^ 1]); + assert_eq!(stores[2].get_status(), status[0]); + } + assert_eq!(accounts.load(0, &pubkey1, true).unwrap(), account1); + assert_eq!(accounts.load(0, &pubkey2, true).unwrap(), account2); + } + cleanup_dirs(&paths); + } + + #[test] + fn test_account_vote() { + let paths = "vote0".to_string(); + let accounts = AccountsDB::new(0, &paths); + let mut pubkeys: Vec = vec![]; + create_account(&accounts, &mut pubkeys, 100, 6); + let accounts = accounts.get_vote_accounts(0); + assert_eq!(accounts.len(), 6); + accounts.iter().for_each(|account| { + assert_eq!(account.owner, vote_program::id()); + }); + cleanup_dirs(&paths); + } +} diff --git a/runtime/src/accounts_trait.rs b/runtime/src/accounts_trait.rs new file mode 100644 index 00000000000000..1a3cc30b87742f --- /dev/null +++ b/runtime/src/accounts_trait.rs @@ -0,0 +1,50 @@ +use crate::bank::{BankError, Result}; +use crate::runtime::has_duplicates; +use bincode::serialize; +use hashbrown::{HashMap, HashSet}; +use log::debug; +use solana_metrics::counter::Counter; +use solana_sdk::account::Account; +use solana_sdk::hash::{hash, Hash}; +use solana_sdk::native_loader; +use solana_sdk::pubkey::Pubkey; +use solana_sdk::transaction::Transaction; +use std::collections::BTreeMap; +use std::ops::Deref; +use std::sync::{Mutex, RwLock}; + +pub type InstructionAccounts = Vec; +pub type InstructionLoaders = Vec>; +pub type Fork = u64; + +#[derive(Debug, Default)] +pub struct ErrorCounters { + pub account_not_found: usize, + pub account_in_use: usize, + pub account_loaded_twice: usize, + pub last_id_not_found: usize, + pub last_id_too_old: usize, + pub reserve_last_id: usize, + pub insufficient_funds: usize, + pub duplicate_signature: usize, + pub call_chain_too_deep: usize, + pub missing_signature_for_fee: usize, +} + +pub trait AccountsStore { + fn new(fork: Fork) -> Self; + fn new_from_parent(fork: Fork, parent: Fork) -> Self; + fn load_slow(fork: Fork, pubkey: &Pubkey) -> Option; + fn load_slow_no_parent(fork: Fork, pubkey: &Pubkey) -> Option; + fn store_slow(fork: Fork, purge: bool, pubkey: &Pubkey, account: &Account); + fn hash_internal_state(fork: Fork) -> Option; + fn lock_accounts(fork: Fork, txs: &[Transaction]) -> Vec>; + fn unlock_accounts(fork: Fork, txs: &[Transaction], results: &[Result<()>]); + fn has_accounts(fork: Fork) -> bool; + fn load_accounts(fork: Fork, txs: &[Transaction], results: Vec>, error_counters: &mut ErrorCounters) -> Vec>; + fn store_accounts(fork: Fork, purge: bool, txs: &[Transaction], res: &[Result<()>], loaded: &[Result<(InstructionAccounts, InstructionLoaders)>]); + fn increment_transaction_count(&self, fork: Fork, tx_count: usize); + fn transaction_count(&self, fork: Fork) -> u64; + fn squash(&self, fork: Fork); + fn get_vote_accounts(&self) -> Vec; +} diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index 18ed94491fdf88..083e2e68813611 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -1,10 +1,13 @@ +use bincode::{deserialize_from, serialize_into, serialized_size}; use memmap::MmapMut; +use serde::{Deserialize, Serialize}; use solana_sdk::account::Account; use solana_sdk::pubkey::Pubkey; +use std::fmt; use std::fs::OpenOptions; -use std::io::{Seek, SeekFrom, Write}; +use std::io::{Cursor, Seek, SeekFrom, Write}; use std::mem; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; @@ -26,7 +29,7 @@ pub struct StorageMeta { pub data_len: u64, } -#[derive(Serialize, Deserialize, Clone, Default, Eq, PartialEq)] +#[derive(Serialize, Deserialize, Clone, Debug, Default, Eq, PartialEq)] pub struct AccountBalance { /// lamports in the account pub lamports: u64, @@ -38,6 +41,7 @@ pub struct AccountBalance { /// References to Memory Mapped memory /// The Account is stored separately from its data, so getting the actual account requires a clone +#[derive(PartialEq, Debug)] pub struct StoredAccount<'a> { pub meta: &'a StorageMeta, /// account data @@ -56,7 +60,9 @@ impl<'a> StoredAccount<'a> { } } +#[derive(Debug)] pub struct AppendVec { + path: PathBuf, map: MmapMut, // This mutex forces append to be single threaded, but concurrent with reads append_offset: Mutex, @@ -82,6 +88,7 @@ impl AppendVec { let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") }; AppendVec { + path: file.to_path_buf(), map, // This mutex forces append to be single threaded, but concurrent with reads // See UNSAFE usage in `append_ptr` @@ -273,6 +280,74 @@ pub mod test_utils { } } +impl Serialize for AppendVec { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::ser::Serializer, + { + use serde::ser::Error; + let len = + serialized_size(&self.path).unwrap() + 4 * serialized_size(&self.file_size).unwrap(); + let mut buf = vec![0u8; len as usize]; + let mut wr = Cursor::new(&mut buf[..]); + serialize_into(&mut wr, &self.path).map_err(Error::custom)?; + serialize_into( + &mut wr, + &(self.current_len.load(Ordering::Relaxed) as u64), + ) + .map_err(Error::custom)?; + serialize_into(&mut wr, &self.file_size).map_err(Error::custom)?; + let len = wr.position() as usize; + serializer.serialize_bytes(&wr.into_inner()[..len]) + } +} + +struct AppendVecVisitor; + +impl<'a> serde::de::Visitor<'a> for AppendVecVisitor { + type Value = AppendVec; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting AppendVec") + } + + fn visit_bytes(self, data: &[u8]) -> std::result::Result + where + E: serde::de::Error, + { + use serde::de::Error; + let mut rd = Cursor::new(&data[..]); + let path: PathBuf = deserialize_from(&mut rd).map_err(Error::custom)?; + let current_len: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; + let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; + + let data = OpenOptions::new() + .read(true) + .write(true) + .create(false) + .open(path.as_path()) + .unwrap(); + + let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") }; + Ok(AppendVec { + path, + map, + append_offset: Mutex::new(0), + current_len: AtomicUsize::new(current_len as usize), + file_size, + }) + } +} + +impl<'de> Deserialize<'de> for AppendVec { + fn deserialize(deserializer: D) -> std::result::Result + where + D: ::serde::Deserializer<'de>, + { + deserializer.deserialize_bytes(AppendVecVisitor) + } +} + #[cfg(test)] pub mod tests { use super::test_utils::*; @@ -342,4 +417,30 @@ pub mod tests { duration_as_ms(&now.elapsed()), ); } + + #[test] + fn test_append_vec_serialize() { + let path = Path::new("append_vec_serialize"); + let av: AppendVec = AppendVec::new(path, true, 1024 * 1024); + let account1 = create_test_account(1); + let index1 = av.append_account_test(&account1).unwrap(); + assert_eq!(index1, 0); + assert_eq!(av.get_account_test(index1).unwrap(), account1); + + let account2 = create_test_account(2); + let index2 = av.append_account_test(&account2).unwrap(); + assert_eq!(av.get_account_test(index2).unwrap(), account2); + assert_eq!(av.get_account_test(index1).unwrap(), account1); + + let mut buf = vec![0u8; serialized_size(&av).unwrap() as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, &av).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let dav: AppendVec = deserialize_from(&mut reader).unwrap(); + + assert_eq!(dav.get_account_test(index2).unwrap(), account2); + assert_eq!(dav.get_account_test(index1).unwrap(), account1); + std::fs::remove_file(path).unwrap(); + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 49a0973fb82d7e..1880c37e95e891 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -8,10 +8,13 @@ use crate::accounts_db::{ErrorCounters, InstructionAccounts, InstructionLoaders} use crate::blockhash_queue::BlockhashQueue; use crate::locked_accounts_results::LockedAccountsResults; use crate::message_processor::{MessageProcessor, ProcessInstruction}; +use crate::serde_utils::{ + deserialize_atomicbool, deserialize_atomicusize, serialize_atomicbool, serialize_atomicusize, +}; use crate::status_cache::StatusCache; use bincode::serialize; -use hashbrown::HashMap; use log::*; +use serde::{Deserialize, Serialize}; use solana_metrics::counter::Counter; use solana_metrics::influxdb; use solana_sdk::account::Account; @@ -25,11 +28,12 @@ use solana_sdk::system_transaction; use solana_sdk::timing::{duration_as_ms, duration_as_us, MAX_RECENT_BLOCKHASHES}; use solana_sdk::transaction::{Result, Transaction, TransactionError}; use solana_vote_api::vote_state::{self, Vote}; +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Instant; -#[derive(Default, Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Default, Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)] pub struct EpochSchedule { /// The maximum number of slots in each epoch. pub slots_per_epoch: u64, @@ -108,7 +112,7 @@ impl EpochSchedule { type BankStatusCache = StatusCache>; /// Manager for the state of all accounts and programs after processing its entries. -#[derive(Default)] +#[derive(Deserialize, Serialize, Default)] pub struct Bank { /// where all the Accounts are stored accounts: Arc, @@ -132,9 +136,13 @@ pub struct Bank { parent_hash: Hash, /// The number of transactions processed without error + #[serde(serialize_with = "serialize_atomicusize")] + #[serde(deserialize_with = "deserialize_atomicusize")] transaction_count: AtomicUsize, // TODO: Use AtomicU64 if/when available /// Bank tick height + #[serde(serialize_with = "serialize_atomicusize")] + #[serde(deserialize_with = "deserialize_atomicusize")] tick_height: AtomicUsize, // TODO: Use AtomicU64 if/when available // Bank max_tick_height @@ -164,6 +172,8 @@ pub struct Bank { /// A boolean reflecting whether any entries were recorded into the PoH /// stream for the slot == self.slot + #[serde(serialize_with = "serialize_atomicbool")] + #[serde(deserialize_with = "deserialize_atomicbool")] is_delta: AtomicBool, /// The Message processor @@ -858,6 +868,14 @@ impl Bank { self.store(pubkey, &account); } + pub fn accounts(&self) -> Arc { + self.accounts.clone() + } + + pub fn set_accounts(&mut self, accounts: &Arc) { + self.accounts = accounts.clone(); + } + pub fn get_account(&self, pubkey: &Pubkey) -> Option { self.accounts.load_slow(&self.ancestors, pubkey) } @@ -1026,6 +1044,37 @@ impl Bank { false } + + pub fn compare_bank(&self, dbank: &Bank) { + assert_eq!(self.slot, dbank.slot); + assert_eq!(self.collector_id, dbank.collector_id); + assert_eq!(self.epoch_schedule, dbank.epoch_schedule); + assert_eq!(self.epoch_vote_accounts, dbank.epoch_vote_accounts); + assert_eq!(self.ticks_per_slot, dbank.ticks_per_slot); + assert_eq!(self.parent_hash, dbank.parent_hash); + assert_eq!( + self.tick_height.load(Ordering::SeqCst), + dbank.tick_height.load(Ordering::SeqCst) + ); + assert_eq!( + self.is_delta.load(Ordering::SeqCst), + dbank.is_delta.load(Ordering::SeqCst) + ); + + let bh = self.hash.read().unwrap(); + let dbh = dbank.hash.read().unwrap(); + assert_eq!(*bh, *dbh); + + let bhq = self.blockhash_queue.read().unwrap(); + let dbhq = dbank.blockhash_queue.read().unwrap(); + assert_eq!(*bhq, *dbhq); + + let sc = self.status_cache.read().unwrap(); + let dsc = dbank.status_cache.read().unwrap(); + assert_eq!(*sc, *dsc); + + Accounts::compare_accounts(&self.accounts, &dbank.accounts); + } } impl Drop for Bank { @@ -1038,6 +1087,7 @@ impl Drop for Bank { #[cfg(test)] mod tests { use super::*; + use bincode::{deserialize_from, serialize_into, serialized_size}; use solana_sdk::genesis_block::{GenesisBlock, BOOTSTRAP_LEADER_LAMPORTS}; use solana_sdk::hash; use solana_sdk::instruction::InstructionError; @@ -1046,6 +1096,7 @@ mod tests { use solana_sdk::system_transaction; use solana_vote_api::vote_instruction; use solana_vote_api::vote_state::VoteState; + use std::io::Cursor; #[test] fn test_bank_new_no_parent() { @@ -1935,4 +1986,19 @@ mod tests { bank.tick_height.store(max_tick_height, Ordering::Relaxed); assert!(bank.is_votable()); } + + #[test] + fn test_bank_serialize() { + let (genesis_block, _) = GenesisBlock::new(500); + let bank0 = Arc::new(Bank::new(&genesis_block)); + let bank = new_from_parent(&bank0); + + let mut buf = vec![0u8; serialized_size(&bank).unwrap() as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, &bank).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let dbank: Bank = deserialize_from(&mut reader).unwrap(); + bank.compare_bank(&dbank); + } } diff --git a/runtime/src/blockhash_queue.rs b/runtime/src/blockhash_queue.rs index 2143be4341c3de..8ad557f880725c 100644 --- a/runtime/src/blockhash_queue.rs +++ b/runtime/src/blockhash_queue.rs @@ -1,15 +1,16 @@ -use hashbrown::HashMap; +use serde::{Deserialize, Serialize}; use solana_sdk::hash::Hash; use solana_sdk::timing::timestamp; +use std::collections::HashMap; -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] struct HashAge { timestamp: u64, hash_height: u64, } /// Low memory overhead, so can be cloned for every checkpoint -#[derive(Clone)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct BlockhashQueue { /// updated whenever an hash is registered hash_height: u64, diff --git a/runtime/src/bloom.rs b/runtime/src/bloom.rs index c1674cffd14ea4..ede157eb6e8270 100644 --- a/runtime/src/bloom.rs +++ b/runtime/src/bloom.rs @@ -16,7 +16,8 @@ pub trait BloomHashIndex { #[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)] pub struct Bloom { pub keys: Vec, - pub bits: BitVec, + pub bits: BitVec, + num_bits_set: u64, _phantom: PhantomData, } @@ -26,6 +27,7 @@ impl Bloom { Bloom { keys, bits, + num_bits_set: 0, _phantom: PhantomData::default(), } } @@ -47,11 +49,15 @@ impl Bloom { } pub fn clear(&mut self) { self.bits = BitVec::new_fill(false, self.bits.len()); + self.num_bits_set = 0; } pub fn add(&mut self, key: &T) { for k in &self.keys { let pos = self.pos(key, *k); - self.bits.set(pos, true); + if !self.bits.get(pos) { + self.num_bits_set += 1; + self.bits.set(pos, true); + } } } pub fn contains(&self, key: &T) -> bool { diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 3a91d1b1101eab..550dd4a1472551 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -10,6 +10,7 @@ pub mod loader_utils; pub mod locked_accounts_results; pub mod message_processor; mod native_loader; +mod serde_utils; mod status_cache; mod system_instruction_processor; diff --git a/runtime/src/message_processor.rs b/runtime/src/message_processor.rs index fa7c14669ffa36..977a24b9dcc315 100644 --- a/runtime/src/message_processor.rs +++ b/runtime/src/message_processor.rs @@ -1,5 +1,6 @@ use crate::native_loader; use crate::system_instruction_processor; +use serde::{Deserialize, Serialize}; use solana_sdk::account::{create_keyed_accounts, Account, KeyedAccount}; use solana_sdk::instruction::{CompiledInstruction, InstructionError}; use solana_sdk::message::Message; @@ -75,7 +76,9 @@ fn verify_instruction( pub type ProcessInstruction = fn(&Pubkey, &mut [KeyedAccount], &[u8], u64) -> Result<(), InstructionError>; +#[derive(Serialize, Deserialize)] pub struct MessageProcessor { + #[serde(skip)] instruction_processors: Vec<(Pubkey, ProcessInstruction)>, } diff --git a/runtime/src/serde_utils.rs b/runtime/src/serde_utils.rs new file mode 100644 index 00000000000000..f4376d83b114ea --- /dev/null +++ b/runtime/src/serde_utils.rs @@ -0,0 +1,62 @@ +use std::fmt; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +struct U64Visitor; +impl<'a> serde::de::Visitor<'a> for U64Visitor { + type Value = u64; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting u64") + } + fn visit_u64(self, data: u64) -> std::result::Result + where + E: serde::de::Error, + { + Ok(data) + } +} + +pub fn deserialize_atomicusize<'de, D>(d: D) -> Result +where + D: serde::de::Deserializer<'de>, +{ + let value = d.deserialize_u64(U64Visitor)?; + Ok(AtomicUsize::new(value as usize)) +} + +pub fn serialize_atomicusize(x: &AtomicUsize, s: S) -> Result +where + S: serde::Serializer, +{ + s.serialize_u64(x.load(Ordering::SeqCst) as u64) +} + +struct BoolVisitor; +impl<'a> serde::de::Visitor<'a> for BoolVisitor { + type Value = bool; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting bool") + } + fn visit_bool(self, data: bool) -> std::result::Result + where + E: serde::de::Error, + { + Ok(data) + } +} + +pub fn deserialize_atomicbool<'de, D>(d: D) -> Result +where + D: serde::de::Deserializer<'de>, +{ + let value = d.deserialize_bool(BoolVisitor)?; + Ok(AtomicBool::new(value)) +} + +pub fn serialize_atomicbool(x: &AtomicBool, s: S) -> Result +where + S: serde::Serializer, +{ + s.serialize_bool(x.load(Ordering::SeqCst)) +} diff --git a/runtime/src/status_cache.rs b/runtime/src/status_cache.rs index a1d377f778b364..9c4d865ce921a8 100644 --- a/runtime/src/status_cache.rs +++ b/runtime/src/status_cache.rs @@ -1,7 +1,8 @@ -use hashbrown::{HashMap, HashSet}; use log::*; +use serde::{Deserialize, Serialize}; use solana_sdk::hash::Hash; use solana_sdk::signature::Signature; +use std::collections::{HashMap, HashSet}; const MAX_CACHE_ENTRIES: usize = solana_sdk::timing::MAX_HASH_AGE_IN_SECONDS; @@ -11,6 +12,7 @@ pub type ForkStatus = Vec<(ForkId, T)>; type SignatureMap = HashMap>; type StatusMap = HashMap)>; +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] pub struct StatusCache { /// all signatures seen during a hash period cache: StatusMap, @@ -97,7 +99,9 @@ impl StatusCache { #[cfg(test)] mod tests { use super::*; + use bincode::{deserialize_from, serialize_into, serialized_size}; use solana_sdk::hash::hash; + use std::io::Cursor; type BankStatusCache = StatusCache<()>; @@ -242,4 +246,29 @@ mod tests { .get_signature_status(&sig, &blockhash, &ancestors) .is_some()); } + + fn test_serialize(sc: &BankStatusCache) { + let mut buf = vec![0u8; serialized_size(sc).unwrap() as usize]; + let mut writer = Cursor::new(&mut buf[..]); + serialize_into(&mut writer, sc).unwrap(); + + let mut reader = Cursor::new(&mut buf[..]); + let deser: BankStatusCache = deserialize_from(&mut reader).unwrap(); + assert_eq!(*sc, deser); + } + + #[test] + fn test_statuscache_serialize() { + let sig = Signature::default(); + let mut status_cache = BankStatusCache::default(); + let blockhash = hash(Hash::default().as_ref()); + status_cache.add_root(0); + status_cache.clear_signatures(); + status_cache.insert(&blockhash, &sig, 0, ()); + test_serialize(&status_cache); + + let new_blockhash = hash(Hash::default().as_ref()); + status_cache.insert(&new_blockhash, &sig, 1, ()); + test_serialize(&status_cache); + } } diff --git a/sdk/src/fee_calculator.rs b/sdk/src/fee_calculator.rs index a60deafcf7d66d..a82ca06bdb05f9 100644 --- a/sdk/src/fee_calculator.rs +++ b/sdk/src/fee_calculator.rs @@ -1,6 +1,6 @@ use crate::message::Message; -#[derive(Default)] +#[derive(Deserialize, Serialize, Default)] pub struct FeeCalculator { pub lamports_per_signature: u64, }