From ee865bf7a3fb1e487e181f5d6a8037128f726c65 Mon Sep 17 00:00:00 2001 From: Sathish Ambley Date: Sun, 12 May 2019 17:04:15 -0700 Subject: [PATCH] keep saved and unsaved copies of status cache --- core/src/bank_forks.rs | 57 ++++++++---- runtime/src/bank.rs | 101 ++++++++++++++++---- runtime/src/status_cache.rs | 178 +++++++++++++++++++++++++++++------- 3 files changed, 265 insertions(+), 71 deletions(-) diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index 6e5ffbedb1d7cb..e244ef1f697f56 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -2,7 +2,7 @@ use bincode::{deserialize_from, serialize_into}; use solana_metrics::inc_new_counter_info; -use solana_runtime::bank::{Bank, BankRc}; +use solana_runtime::bank::{Bank, BankRc, StatusCacheRc}; use solana_sdk::timing; use std::collections::{HashMap, HashSet}; use std::env; @@ -157,7 +157,7 @@ impl BankForks { if **slot > root { let _ = self.add_snapshot(**slot, root); } else if **slot > 0 { - self.remove_snapshot(**slot); + BankForks::remove_snapshot(**slot); } } } @@ -191,14 +191,16 @@ impl BankForks { } serialize_into(&mut stream, &parent_slot) .map_err(|_| BankForks::get_io_error("serialize bank parent error"))?; - serialize_into(&mut stream, &bank.rc) - .map_err(|_| BankForks::get_io_error("serialize bank rc error"))?; serialize_into(&mut stream, &root) .map_err(|_| BankForks::get_io_error("serialize root error"))?; + serialize_into(&mut stream, &bank.src) + .map_err(|_| BankForks::get_io_error("serialize bank status cache error"))?; + serialize_into(&mut stream, &bank.rc) + .map_err(|_| BankForks::get_io_error("serialize bank accounts error"))?; Ok(()) } - pub fn remove_snapshot(&self, slot: u64) { + pub fn remove_snapshot(slot: u64) { let path = BankForks::get_snapshot_path(); let bank_file = format!("{}", slot); let bank_file_path = path.join(bank_file); @@ -212,14 +214,15 @@ impl BankForks { fn setup_banks( bank_maps: &mut Vec<(u64, u64, Bank)>, bank_rc: &BankRc, + status_cache_rc: &StatusCacheRc, ) -> (HashMap>, HashSet, u64) { let mut banks = HashMap::new(); let mut slots = HashSet::new(); let (last_slot, last_parent_slot, mut last_bank) = bank_maps.remove(0); - last_bank.set_bank_rc(&bank_rc); + last_bank.set_bank_rc(&bank_rc, &status_cache_rc); while let Some((slot, parent_slot, mut bank)) = bank_maps.pop() { - bank.set_bank_rc(&bank_rc); + bank.set_bank_rc(&bank_rc, &status_cache_rc); if parent_slot != 0 { if let Some(parent) = banks.get(&parent_slot) { bank.set_parent(parent); @@ -255,7 +258,8 @@ impl BankForks { names.sort(); let mut bank_maps = vec![]; let mut bank_rc: Option = None; - let mut root: u64 = 0; + let status_cache_rc = StatusCacheRc::default(); + let mut bank_forks_root: u64 = 0; for bank_slot in names.iter().rev() { let bank_path = format!("{}", bank_slot); let bank_file_path = path.join(bank_path.clone()); @@ -267,33 +271,44 @@ impl BankForks { let slot: Result = deserialize_from(&mut stream) .map_err(|_| BankForks::get_io_error("deserialize bank parent error")); let parent_slot = if slot.is_ok() { slot.unwrap() } else { 0 }; + let root: Result = deserialize_from(&mut stream) + .map_err(|_| BankForks::get_io_error("deserialize root error")); + let status_cache: Result = deserialize_from(&mut stream) + .map_err(|_| BankForks::get_io_error("deserialize bank status cache error")); if bank_rc.is_none() { let rc: Result = deserialize_from(&mut stream) - .map_err(|_| BankForks::get_io_error("deserialize bank rc error")); + .map_err(|_| BankForks::get_io_error("deserialize bank accounts error")); if rc.is_ok() { bank_rc = Some(rc.unwrap()); - let r: Result = deserialize_from(&mut stream) - .map_err(|_| BankForks::get_io_error("deserialize root error")); - if r.is_ok() { - root = r.unwrap(); - } + bank_forks_root = root.unwrap(); } } - match bank { - Ok(v) => bank_maps.push((*bank_slot, parent_slot, v)), - Err(_) => warn!("Load snapshot failed for {}", bank_slot), + if bank_rc.is_some() { + match bank { + Ok(v) => { + if status_cache.is_ok() { + status_cache_rc.append(&status_cache.unwrap()); + } + bank_maps.push((*bank_slot, parent_slot, v)); + } + Err(_) => warn!("Load snapshot failed for {}", bank_slot), + } + } else { + BankForks::remove_snapshot(*bank_slot); + warn!("Load snapshot rc failed for {}", bank_slot); } } if bank_maps.is_empty() || bank_rc.is_none() { return Err(Error::new(ErrorKind::Other, "no snapshots loaded")); } - let (banks, slots, last_slot) = BankForks::setup_banks(&mut bank_maps, &bank_rc.unwrap()); + let (banks, slots, last_slot) = + BankForks::setup_banks(&mut bank_maps, &bank_rc.unwrap(), &status_cache_rc); let working_bank = banks[&last_slot].clone(); Ok(BankForks { banks, working_bank, - root, + root: bank_forks_root, slots, use_snapshot: true, }) @@ -434,7 +449,7 @@ mod tests { bank.compare_bank(&new_bank); } for (slot, _) in new.banks.iter() { - new.remove_snapshot(*slot); + BankForks::remove_snapshot(*slot); } } @@ -461,6 +476,8 @@ mod tests { bank.freeze(); bank_forks.insert(bank); save_and_load_snapshot(&bank_forks); + let bank = bank_forks.get(index).unwrap(); + bank.clear_signatures(); } assert_eq!(bank_forks.working_bank().slot(), 10); } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index d6bef569f5ad37..03cba53a7a5e73 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -142,9 +142,6 @@ pub struct BankRc { /// where all the Accounts are stored accounts: Arc, - /// A cache of signature statuses - status_cache: Arc>, - /// Previous checkpoint of this bank parent: RwLock>>, } @@ -156,13 +153,11 @@ impl Serialize for BankRc { { use serde::ser::Error; let len = serialized_size(&*self.accounts.accounts_db).unwrap() - + serialized_size(&*self.accounts).unwrap() - + serialized_size(&*self.status_cache).unwrap(); + + serialized_size(&*self.accounts).unwrap(); let mut buf = vec![0u8; len as usize]; let mut wr = Cursor::new(&mut buf[..]); - serialize_into(&mut wr, &*self.accounts.accounts_db).map_err(Error::custom)?; serialize_into(&mut wr, &*self.accounts).map_err(Error::custom)?; - serialize_into(&mut wr, &*self.status_cache).map_err(Error::custom)?; + serialize_into(&mut wr, &*self.accounts.accounts_db).map_err(Error::custom)?; let len = wr.position() as usize; serializer.serialize_bytes(&wr.into_inner()[..len]) } @@ -184,14 +179,12 @@ impl<'a> serde::de::Visitor<'a> for BankRcVisitor { { use serde::de::Error; let mut rd = Cursor::new(&data[..]); - let accounts_db: AccountsDB = deserialize_from(&mut rd).map_err(Error::custom)?; let mut accounts: Accounts = deserialize_from(&mut rd).map_err(Error::custom)?; - let status_cache: BankStatusCache = deserialize_from(&mut rd).map_err(Error::custom)?; + let accounts_db: AccountsDB = deserialize_from(&mut rd).map_err(Error::custom)?; accounts.accounts_db = Arc::new(accounts_db); Ok(BankRc { accounts: Arc::new(accounts), - status_cache: Arc::new(RwLock::new(status_cache)), parent: RwLock::new(None), }) } @@ -206,6 +199,71 @@ impl<'de> Deserialize<'de> for BankRc { } } +#[derive(Default)] +pub struct StatusCacheRc { + /// where all the Accounts are stored + /// A cache of signature statuses + status_cache: Arc>, +} + +impl Serialize for StatusCacheRc { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::ser::Serializer, + { + use serde::ser::Error; + let len = serialized_size(&*self.status_cache).unwrap(); + let mut buf = vec![0u8; len as usize]; + let mut wr = Cursor::new(&mut buf[..]); + { + let mut status_cache = self.status_cache.write().unwrap(); + serialize_into(&mut wr, &*status_cache).map_err(Error::custom)?; + status_cache.merge_caches(); + } + let len = wr.position() as usize; + serializer.serialize_bytes(&wr.into_inner()[..len]) + } +} + +struct StatusCacheRcVisitor; + +impl<'a> serde::de::Visitor<'a> for StatusCacheRcVisitor { + type Value = StatusCacheRc; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("Expecting StatusCacheRc") + } + + #[allow(clippy::mutex_atomic)] + fn visit_bytes(self, data: &[u8]) -> std::result::Result + where + E: serde::de::Error, + { + use serde::de::Error; + let mut rd = Cursor::new(&data[..]); + let status_cache: BankStatusCache = deserialize_from(&mut rd).map_err(Error::custom)?; + Ok(StatusCacheRc { + status_cache: Arc::new(RwLock::new(status_cache)), + }) + } +} + +impl<'de> Deserialize<'de> for StatusCacheRc { + fn deserialize(deserializer: D) -> std::result::Result + where + D: ::serde::Deserializer<'de>, + { + deserializer.deserialize_bytes(StatusCacheRcVisitor) + } +} + +impl StatusCacheRc { + pub fn append(&self, status_cache_rc: &StatusCacheRc) { + let sc = status_cache_rc.status_cache.write().unwrap(); + self.status_cache.write().unwrap().append(&sc); + } +} + /// Manager for the state of all accounts and programs after processing its entries. #[derive(Default, Deserialize, Serialize)] pub struct Bank { @@ -213,6 +271,9 @@ pub struct Bank { #[serde(skip)] pub rc: BankRc, + #[serde(skip)] + pub src: StatusCacheRc, + /// FIFO queue of `recent_blockhash` items blockhash_queue: RwLock, @@ -305,7 +366,7 @@ impl Bank { let mut bank = Self::default(); bank.blockhash_queue = RwLock::new(parent.blockhash_queue.read().unwrap().clone()); - bank.rc.status_cache = parent.rc.status_cache.clone(); + bank.src.status_cache = parent.src.status_cache.clone(); bank.bank_height = parent.bank_height + 1; bank.fee_calculator = parent.fee_calculator.clone(); @@ -399,7 +460,7 @@ impl Bank { let squash_cache_start = Instant::now(); parents .iter() - .for_each(|p| self.rc.status_cache.write().unwrap().add_root(p.slot())); + .for_each(|p| self.src.status_cache.write().unwrap().add_root(p.slot())); let squash_cache_ms = duration_as_ms(&squash_cache_start.elapsed()); datapoint!( @@ -483,7 +544,7 @@ impl Bank { /// Forget all signatures. Useful for benchmarking. pub fn clear_signatures(&self) { - self.rc.status_cache.write().unwrap().clear_signatures(); + self.src.status_cache.write().unwrap().clear_signatures(); } pub fn can_commit(result: &Result<()>) -> bool { @@ -495,7 +556,7 @@ impl Bank { } fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) { - let mut status_cache = self.rc.status_cache.write().unwrap(); + let mut status_cache = self.src.status_cache.write().unwrap(); for (i, tx) in txs.iter().enumerate() { if Self::can_commit(&res[i]) && !tx.signatures.is_empty() { status_cache.insert( @@ -678,7 +739,7 @@ impl Bank { lock_results: Vec>, error_counters: &mut ErrorCounters, ) -> Vec> { - let rcache = self.rc.status_cache.read().unwrap(); + let rcache = self.src.status_cache.read().unwrap(); txs.iter() .zip(lock_results.into_iter()) .map(|(tx, lock_res)| { @@ -978,9 +1039,9 @@ impl Bank { self.rc.accounts.clone() } - pub fn set_bank_rc(&mut self, bank_rc: &BankRc) { + pub fn set_bank_rc(&mut self, bank_rc: &BankRc, status_cache_rc: &StatusCacheRc) { self.rc.accounts = bank_rc.accounts.clone(); - self.rc.status_cache = bank_rc.status_cache.clone() + self.src.status_cache = status_cache_rc.status_cache.clone() } pub fn set_parent(&mut self, parent: &Arc) { @@ -1018,7 +1079,7 @@ impl Bank { &self, signature: &Signature, ) -> Option<(usize, Result<()>)> { - let rcache = self.rc.status_cache.read().unwrap(); + let rcache = self.src.status_cache.read().unwrap(); rcache.get_signature_status_slow(signature, &self.ancestors) } @@ -1165,8 +1226,8 @@ impl Bank { let dbhq = dbank.blockhash_queue.read().unwrap(); assert_eq!(*bhq, *dbhq); - let sc = self.rc.status_cache.read().unwrap(); - let dsc = dbank.rc.status_cache.read().unwrap(); + let sc = self.src.status_cache.read().unwrap(); + let dsc = dbank.src.status_cache.read().unwrap(); assert_eq!(*sc, *dsc); assert_eq!( self.rc.accounts.hash_internal_state(self.slot), diff --git a/runtime/src/status_cache.rs b/runtime/src/status_cache.rs index 5c7ffa40b7d55f..19adee9eb5c9f6 100644 --- a/runtime/src/status_cache.rs +++ b/runtime/src/status_cache.rs @@ -1,4 +1,5 @@ use log::*; +use serde::ser::SerializeSeq; use serde::{Deserialize, Serialize}; use solana_sdk::hash::Hash; use solana_sdk::signature::Signature; @@ -12,23 +13,36 @@ pub type ForkStatus = Vec<(ForkId, T)>; type SignatureMap = HashMap>; type StatusMap = HashMap)>; -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] -pub struct StatusCache { +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct StatusCache { /// all signatures seen during a hash period - cache: StatusMap, + #[serde(serialize_with = "serialize_statusmap")] + cache: Vec>, roots: HashSet, } -impl Default for StatusCache { +fn serialize_statusmap(x: &[StatusMap], s: S) -> Result +where + T: serde::Serialize + Clone, + S: serde::Serializer, +{ + let cache0: StatusMap = HashMap::new(); + let mut seq = s.serialize_seq(Some(x.len()))?; + seq.serialize_element(&cache0)?; + seq.serialize_element(&x[1])?; + seq.end() +} + +impl Default for StatusCache { fn default() -> Self { Self { - cache: HashMap::default(), + cache: vec![HashMap::default(); 2], roots: HashSet::default(), } } } -impl StatusCache { +impl StatusCache { /// Check if the signature from a transaction is in any of the forks in the ancestors set. pub fn get_signature_status( &self, @@ -36,13 +50,24 @@ impl StatusCache { transaction_blockhash: &Hash, ancestors: &HashMap, ) -> Option<(ForkId, T)> { - let (_, sigmap) = self.cache.get(transaction_blockhash)?; - let stored_forks = sigmap.get(sig)?; - stored_forks - .iter() - .filter(|(f, _)| ancestors.get(f).is_some() || self.roots.get(f).is_some()) - .nth(0) - .cloned() + for cache in self.cache.iter() { + let map = cache.get(transaction_blockhash); + if map.is_none() { + continue; + } + let (_, sigmap) = map.unwrap(); + if let Some(stored_forks) = sigmap.get(sig) { + let res = stored_forks + .iter() + .filter(|(f, _)| ancestors.get(f).is_some() || self.roots.get(f).is_some()) + .nth(0) + .cloned(); + if res.is_some() { + return res; + } + } + } + None } /// TODO: wallets should send the Transactions recent blockhash as well @@ -52,7 +77,12 @@ impl StatusCache { ancestors: &HashMap, ) -> Option<(usize, T)> { trace!("get_signature_status_slow"); - for blockhash in self.cache.keys() { + let mut keys = vec![]; + for cache in self.cache.iter() { + let mut val: Vec<_> = cache.iter().map(|(k, _)| *k).collect(); + keys.append(&mut val); + } + for blockhash in keys.iter() { trace!("get_signature_status_slow: trying {}", blockhash); if let Some((forkid, res)) = self.get_signature_status(sig, blockhash, ancestors) { trace!("get_signature_status_slow: got {}", forkid); @@ -72,26 +102,62 @@ impl StatusCache { if self.roots.len() > MAX_CACHE_ENTRIES { if let Some(min) = self.roots.iter().min().cloned() { self.roots.remove(&min); - self.cache.retain(|_, (fork, _)| *fork > min); + for cache in self.cache.iter_mut() { + cache.retain(|_, (fork, _)| *fork > min); + } } } } /// Insert a new signature for a specific fork. pub fn insert(&mut self, transaction_blockhash: &Hash, sig: &Signature, fork: ForkId, res: T) { - let sig_map = self - .cache + self.insert_entry(transaction_blockhash, sig, vec![(fork, res)], 1); + } + + fn insert_entry( + &mut self, + transaction_blockhash: &Hash, + sig: &Signature, + status: Vec<(ForkId, T)>, + index: usize, + ) { + let fork = status + .iter() + .fold(0, |acc, (f, _)| if acc > *f { acc } else { *f }); + let sig_map = self.cache[index] .entry(*transaction_blockhash) .or_insert((fork, HashMap::new())); sig_map.0 = std::cmp::max(fork, sig_map.0); let sig_forks = sig_map.1.entry(*sig).or_insert_with(|| vec![]); - sig_forks.push((fork, res)); + sig_forks.extend(status); } /// Clear for testing pub fn clear_signatures(&mut self) { - for v in self.cache.values_mut() { - v.1 = HashMap::new(); + for cache in self.cache.iter_mut() { + for v in cache.values_mut() { + v.1 = HashMap::new(); + } + } + } + + pub fn append(&mut self, status_cache: &StatusCache) { + for (hash, sigmap) in status_cache.cache[1].iter() { + for (signature, fork_status) in sigmap.1.iter() { + self.insert_entry(hash, signature, fork_status.clone(), 0); + } + } + + self.roots = self.roots.union(&status_cache.roots).cloned().collect(); + } + + pub fn merge_caches(&mut self) { + let mut cache = HashMap::new(); + std::mem::swap(&mut cache, &mut self.cache[1]); + for (hash, sigmap) in cache.iter() { + for (signature, fork_status) in sigmap.1.iter() { + self.insert_entry(hash, signature, fork_status.clone(), 0); + } } } } @@ -247,28 +313,78 @@ mod tests { .is_some()); } - fn test_serialize(sc: &BankStatusCache) { - let mut buf = vec![0u8; serialized_size(sc).unwrap() as usize]; + #[test] + fn test_statuscache_append() { + let sig = Signature::default(); + let mut status_cache0 = BankStatusCache::default(); + let blockhash0 = hash(Hash::new(&vec![0; 32]).as_ref()); + status_cache0.add_root(0); + status_cache0.insert(&blockhash0, &sig, 0, ()); + + let sig = Signature::default(); + let mut status_cache1 = BankStatusCache::default(); + let blockhash1 = hash(Hash::new(&vec![1; 32]).as_ref()); + status_cache1.insert(&blockhash0, &sig, 1, ()); + status_cache1.add_root(1); + status_cache1.insert(&blockhash1, &sig, 1, ()); + + status_cache0.append(&status_cache1); + let roots: HashSet<_> = [0, 1].iter().cloned().collect(); + assert_eq!(status_cache0.roots, roots); + let ancestors = vec![(0, 1), (1, 1)].into_iter().collect(); + assert!(status_cache0 + .get_signature_status(&sig, &blockhash0, &ancestors) + .is_some()); + assert!(status_cache0 + .get_signature_status(&sig, &blockhash1, &ancestors) + .is_some()); + } + + fn test_serialize(sc: &mut BankStatusCache, blockhash: Vec, sig: &Signature) { + let len = serialized_size(&sc).unwrap(); + let mut buf = vec![0u8; len as usize]; let mut writer = Cursor::new(&mut buf[..]); + let cache0 = sc.cache[0].clone(); serialize_into(&mut writer, sc).unwrap(); + sc.merge_caches(); + let len = writer.position() as usize; - let mut reader = Cursor::new(&mut buf[..]); - let deser: BankStatusCache = deserialize_from(&mut reader).unwrap(); - assert_eq!(*sc, deser); + let mut reader = Cursor::new(&mut buf[..len]); + let mut status_cache: BankStatusCache = deserialize_from(&mut reader).unwrap(); + status_cache.cache[0] = cache0; + status_cache.merge_caches(); + assert!(status_cache.cache[0].len() > 0); + assert!(status_cache.cache[1].is_empty()); + let ancestors = vec![(0, 1), (1, 1)].into_iter().collect(); + assert_eq!(*sc, status_cache); + for hash in blockhash.iter() { + assert!(status_cache + .get_signature_status(&sig, &hash, &ancestors) + .is_some()); + } } #[test] fn test_statuscache_serialize() { let sig = Signature::default(); let mut status_cache = BankStatusCache::default(); - let blockhash = hash(Hash::default().as_ref()); + let blockhash0 = hash(Hash::new(&vec![0; 32]).as_ref()); status_cache.add_root(0); status_cache.clear_signatures(); - status_cache.insert(&blockhash, &sig, 0, ()); - test_serialize(&status_cache); + status_cache.insert(&blockhash0, &sig, 0, ()); + test_serialize(&mut status_cache, vec![blockhash0], &sig); + + status_cache.insert(&blockhash0, &sig, 1, ()); + test_serialize(&mut status_cache, vec![blockhash0], &sig); - let new_blockhash = hash(Hash::default().as_ref()); - status_cache.insert(&new_blockhash, &sig, 1, ()); - test_serialize(&status_cache); + let blockhash1 = hash(Hash::new(&vec![1; 32]).as_ref()); + status_cache.insert(&blockhash1, &sig, 1, ()); + test_serialize(&mut status_cache, vec![blockhash0, blockhash1], &sig); + + let blockhash2 = hash(Hash::new(&vec![2; 32]).as_ref()); + let ancestors = vec![(0, 1), (1, 1)].into_iter().collect(); + assert!(status_cache + .get_signature_status(&sig, &blockhash2, &ancestors) + .is_none()); } }