Skip to content

Commit

Permalink
keep saved and unsaved copies of status cache
Browse files Browse the repository at this point in the history
  • Loading branch information
sambley committed May 13, 2019
1 parent 5d19fc9 commit ee865bf
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 71 deletions.
57 changes: 37 additions & 20 deletions core/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use bincode::{deserialize_from, serialize_into};
use solana_metrics::inc_new_counter_info;
use solana_runtime::bank::{Bank, BankRc};
use solana_runtime::bank::{Bank, BankRc, StatusCacheRc};
use solana_sdk::timing;
use std::collections::{HashMap, HashSet};
use std::env;
Expand Down Expand Up @@ -157,7 +157,7 @@ impl BankForks {
if **slot > root {
let _ = self.add_snapshot(**slot, root);
} else if **slot > 0 {
self.remove_snapshot(**slot);
BankForks::remove_snapshot(**slot);
}
}
}
Expand Down Expand Up @@ -191,14 +191,16 @@ impl BankForks {
}
serialize_into(&mut stream, &parent_slot)
.map_err(|_| BankForks::get_io_error("serialize bank parent error"))?;
serialize_into(&mut stream, &bank.rc)
.map_err(|_| BankForks::get_io_error("serialize bank rc error"))?;
serialize_into(&mut stream, &root)
.map_err(|_| BankForks::get_io_error("serialize root error"))?;
serialize_into(&mut stream, &bank.src)
.map_err(|_| BankForks::get_io_error("serialize bank status cache error"))?;
serialize_into(&mut stream, &bank.rc)
.map_err(|_| BankForks::get_io_error("serialize bank accounts error"))?;
Ok(())
}

pub fn remove_snapshot(&self, slot: u64) {
pub fn remove_snapshot(slot: u64) {
let path = BankForks::get_snapshot_path();
let bank_file = format!("{}", slot);
let bank_file_path = path.join(bank_file);
Expand All @@ -212,14 +214,15 @@ impl BankForks {
fn setup_banks(
bank_maps: &mut Vec<(u64, u64, Bank)>,
bank_rc: &BankRc,
status_cache_rc: &StatusCacheRc,
) -> (HashMap<u64, Arc<Bank>>, HashSet<u64>, u64) {
let mut banks = HashMap::new();
let mut slots = HashSet::new();
let (last_slot, last_parent_slot, mut last_bank) = bank_maps.remove(0);
last_bank.set_bank_rc(&bank_rc);
last_bank.set_bank_rc(&bank_rc, &status_cache_rc);

while let Some((slot, parent_slot, mut bank)) = bank_maps.pop() {
bank.set_bank_rc(&bank_rc);
bank.set_bank_rc(&bank_rc, &status_cache_rc);
if parent_slot != 0 {
if let Some(parent) = banks.get(&parent_slot) {
bank.set_parent(parent);
Expand Down Expand Up @@ -255,7 +258,8 @@ impl BankForks {
names.sort();
let mut bank_maps = vec![];
let mut bank_rc: Option<BankRc> = None;
let mut root: u64 = 0;
let status_cache_rc = StatusCacheRc::default();
let mut bank_forks_root: u64 = 0;
for bank_slot in names.iter().rev() {
let bank_path = format!("{}", bank_slot);
let bank_file_path = path.join(bank_path.clone());
Expand All @@ -267,33 +271,44 @@ impl BankForks {
let slot: Result<u64, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize bank parent error"));
let parent_slot = if slot.is_ok() { slot.unwrap() } else { 0 };
let root: Result<u64, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize root error"));
let status_cache: Result<StatusCacheRc, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize bank status cache error"));
if bank_rc.is_none() {
let rc: Result<BankRc, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize bank rc error"));
.map_err(|_| BankForks::get_io_error("deserialize bank accounts error"));
if rc.is_ok() {
bank_rc = Some(rc.unwrap());
let r: Result<u64, std::io::Error> = deserialize_from(&mut stream)
.map_err(|_| BankForks::get_io_error("deserialize root error"));
if r.is_ok() {
root = r.unwrap();
}
bank_forks_root = root.unwrap();
}
}
match bank {
Ok(v) => bank_maps.push((*bank_slot, parent_slot, v)),
Err(_) => warn!("Load snapshot failed for {}", bank_slot),
if bank_rc.is_some() {
match bank {
Ok(v) => {
if status_cache.is_ok() {
status_cache_rc.append(&status_cache.unwrap());
}
bank_maps.push((*bank_slot, parent_slot, v));
}
Err(_) => warn!("Load snapshot failed for {}", bank_slot),
}
} else {
BankForks::remove_snapshot(*bank_slot);
warn!("Load snapshot rc failed for {}", bank_slot);
}
}
if bank_maps.is_empty() || bank_rc.is_none() {
return Err(Error::new(ErrorKind::Other, "no snapshots loaded"));
}

let (banks, slots, last_slot) = BankForks::setup_banks(&mut bank_maps, &bank_rc.unwrap());
let (banks, slots, last_slot) =
BankForks::setup_banks(&mut bank_maps, &bank_rc.unwrap(), &status_cache_rc);
let working_bank = banks[&last_slot].clone();
Ok(BankForks {
banks,
working_bank,
root,
root: bank_forks_root,
slots,
use_snapshot: true,
})
Expand Down Expand Up @@ -434,7 +449,7 @@ mod tests {
bank.compare_bank(&new_bank);
}
for (slot, _) in new.banks.iter() {
new.remove_snapshot(*slot);
BankForks::remove_snapshot(*slot);
}
}

Expand All @@ -461,6 +476,8 @@ mod tests {
bank.freeze();
bank_forks.insert(bank);
save_and_load_snapshot(&bank_forks);
let bank = bank_forks.get(index).unwrap();
bank.clear_signatures();
}
assert_eq!(bank_forks.working_bank().slot(), 10);
}
Expand Down
101 changes: 81 additions & 20 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,6 @@ pub struct BankRc {
/// where all the Accounts are stored
accounts: Arc<Accounts>,

/// A cache of signature statuses
status_cache: Arc<RwLock<BankStatusCache>>,

/// Previous checkpoint of this bank
parent: RwLock<Option<Arc<Bank>>>,
}
Expand All @@ -156,13 +153,11 @@ impl Serialize for BankRc {
{
use serde::ser::Error;
let len = serialized_size(&*self.accounts.accounts_db).unwrap()
+ serialized_size(&*self.accounts).unwrap()
+ serialized_size(&*self.status_cache).unwrap();
+ serialized_size(&*self.accounts).unwrap();
let mut buf = vec![0u8; len as usize];
let mut wr = Cursor::new(&mut buf[..]);
serialize_into(&mut wr, &*self.accounts.accounts_db).map_err(Error::custom)?;
serialize_into(&mut wr, &*self.accounts).map_err(Error::custom)?;
serialize_into(&mut wr, &*self.status_cache).map_err(Error::custom)?;
serialize_into(&mut wr, &*self.accounts.accounts_db).map_err(Error::custom)?;
let len = wr.position() as usize;
serializer.serialize_bytes(&wr.into_inner()[..len])
}
Expand All @@ -184,14 +179,12 @@ impl<'a> serde::de::Visitor<'a> for BankRcVisitor {
{
use serde::de::Error;
let mut rd = Cursor::new(&data[..]);
let accounts_db: AccountsDB = deserialize_from(&mut rd).map_err(Error::custom)?;
let mut accounts: Accounts = deserialize_from(&mut rd).map_err(Error::custom)?;
let status_cache: BankStatusCache = deserialize_from(&mut rd).map_err(Error::custom)?;
let accounts_db: AccountsDB = deserialize_from(&mut rd).map_err(Error::custom)?;

accounts.accounts_db = Arc::new(accounts_db);
Ok(BankRc {
accounts: Arc::new(accounts),
status_cache: Arc::new(RwLock::new(status_cache)),
parent: RwLock::new(None),
})
}
Expand All @@ -206,13 +199,81 @@ impl<'de> Deserialize<'de> for BankRc {
}
}

#[derive(Default)]
pub struct StatusCacheRc {
/// where all the Accounts are stored
/// A cache of signature statuses
status_cache: Arc<RwLock<BankStatusCache>>,
}

impl Serialize for StatusCacheRc {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::Error;
let len = serialized_size(&*self.status_cache).unwrap();
let mut buf = vec![0u8; len as usize];
let mut wr = Cursor::new(&mut buf[..]);
{
let mut status_cache = self.status_cache.write().unwrap();
serialize_into(&mut wr, &*status_cache).map_err(Error::custom)?;
status_cache.merge_caches();
}
let len = wr.position() as usize;
serializer.serialize_bytes(&wr.into_inner()[..len])
}
}

struct StatusCacheRcVisitor;

impl<'a> serde::de::Visitor<'a> for StatusCacheRcVisitor {
type Value = StatusCacheRc;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expecting StatusCacheRc")
}

#[allow(clippy::mutex_atomic)]
fn visit_bytes<E>(self, data: &[u8]) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
use serde::de::Error;
let mut rd = Cursor::new(&data[..]);
let status_cache: BankStatusCache = deserialize_from(&mut rd).map_err(Error::custom)?;
Ok(StatusCacheRc {
status_cache: Arc::new(RwLock::new(status_cache)),
})
}
}

impl<'de> Deserialize<'de> for StatusCacheRc {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: ::serde::Deserializer<'de>,
{
deserializer.deserialize_bytes(StatusCacheRcVisitor)
}
}

impl StatusCacheRc {
pub fn append(&self, status_cache_rc: &StatusCacheRc) {
let sc = status_cache_rc.status_cache.write().unwrap();
self.status_cache.write().unwrap().append(&sc);
}
}

/// Manager for the state of all accounts and programs after processing its entries.
#[derive(Default, Deserialize, Serialize)]
pub struct Bank {
/// References to accounts, parent and signature status
#[serde(skip)]
pub rc: BankRc,

#[serde(skip)]
pub src: StatusCacheRc,

/// FIFO queue of `recent_blockhash` items
blockhash_queue: RwLock<BlockhashQueue>,

Expand Down Expand Up @@ -305,7 +366,7 @@ impl Bank {

let mut bank = Self::default();
bank.blockhash_queue = RwLock::new(parent.blockhash_queue.read().unwrap().clone());
bank.rc.status_cache = parent.rc.status_cache.clone();
bank.src.status_cache = parent.src.status_cache.clone();
bank.bank_height = parent.bank_height + 1;
bank.fee_calculator = parent.fee_calculator.clone();

Expand Down Expand Up @@ -399,7 +460,7 @@ impl Bank {
let squash_cache_start = Instant::now();
parents
.iter()
.for_each(|p| self.rc.status_cache.write().unwrap().add_root(p.slot()));
.for_each(|p| self.src.status_cache.write().unwrap().add_root(p.slot()));
let squash_cache_ms = duration_as_ms(&squash_cache_start.elapsed());

datapoint!(
Expand Down Expand Up @@ -483,7 +544,7 @@ impl Bank {

/// Forget all signatures. Useful for benchmarking.
pub fn clear_signatures(&self) {
self.rc.status_cache.write().unwrap().clear_signatures();
self.src.status_cache.write().unwrap().clear_signatures();
}

pub fn can_commit(result: &Result<()>) -> bool {
Expand All @@ -495,7 +556,7 @@ impl Bank {
}

fn update_transaction_statuses(&self, txs: &[Transaction], res: &[Result<()>]) {
let mut status_cache = self.rc.status_cache.write().unwrap();
let mut status_cache = self.src.status_cache.write().unwrap();
for (i, tx) in txs.iter().enumerate() {
if Self::can_commit(&res[i]) && !tx.signatures.is_empty() {
status_cache.insert(
Expand Down Expand Up @@ -678,7 +739,7 @@ impl Bank {
lock_results: Vec<Result<()>>,
error_counters: &mut ErrorCounters,
) -> Vec<Result<()>> {
let rcache = self.rc.status_cache.read().unwrap();
let rcache = self.src.status_cache.read().unwrap();
txs.iter()
.zip(lock_results.into_iter())
.map(|(tx, lock_res)| {
Expand Down Expand Up @@ -978,9 +1039,9 @@ impl Bank {
self.rc.accounts.clone()
}

pub fn set_bank_rc(&mut self, bank_rc: &BankRc) {
pub fn set_bank_rc(&mut self, bank_rc: &BankRc, status_cache_rc: &StatusCacheRc) {
self.rc.accounts = bank_rc.accounts.clone();
self.rc.status_cache = bank_rc.status_cache.clone()
self.src.status_cache = status_cache_rc.status_cache.clone()
}

pub fn set_parent(&mut self, parent: &Arc<Bank>) {
Expand Down Expand Up @@ -1018,7 +1079,7 @@ impl Bank {
&self,
signature: &Signature,
) -> Option<(usize, Result<()>)> {
let rcache = self.rc.status_cache.read().unwrap();
let rcache = self.src.status_cache.read().unwrap();
rcache.get_signature_status_slow(signature, &self.ancestors)
}

Expand Down Expand Up @@ -1165,8 +1226,8 @@ impl Bank {
let dbhq = dbank.blockhash_queue.read().unwrap();
assert_eq!(*bhq, *dbhq);

let sc = self.rc.status_cache.read().unwrap();
let dsc = dbank.rc.status_cache.read().unwrap();
let sc = self.src.status_cache.read().unwrap();
let dsc = dbank.src.status_cache.read().unwrap();
assert_eq!(*sc, *dsc);
assert_eq!(
self.rc.accounts.hash_internal_state(self.slot),
Expand Down
Loading

0 comments on commit ee865bf

Please sign in to comment.