Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
regenerate account index from the storage
Browse files Browse the repository at this point in the history
  • Loading branch information
sambley committed May 1, 2019
1 parent ee58dad commit 003a4a0
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 35 deletions.
9 changes: 5 additions & 4 deletions core/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ impl BankForks {
.collect();
let descendants = self.descendants();
self.banks
.retain(|slot, _| descendants[&root].contains(slot))
self.banks
.retain(|slot, bank| *slot >= root || bank.is_in_subtree_of(root));
.retain(|slot, _| descendants[&root].contains(slot));
let diff: HashSet<_> = slots.symmetric_difference(&self.slots).collect();
for slot in diff.iter() {
if **slot > root {
Expand Down Expand Up @@ -203,6 +201,7 @@ impl BankForks {
names.sort();
let mut banks: HashMap<u64, Arc<Bank>> = HashMap::new();
let mut slots = HashSet::new();
let mut last_slot: u64 = 0;
for bank_slot in names.clone() {
let bank_path = format!("{}", bank_slot);
let bank_file_path = path.join(bank_path.clone());
Expand All @@ -215,11 +214,13 @@ impl BankForks {
Ok(v) => {
banks.insert(bank_slot, Arc::new(v));
slots.insert(bank_slot);
last_slot = bank_slot;
}
Err(_) => warn!("Load snapshot failed for {}", bank_slot),
}
}
let working_bank = banks[&names[names.len() - 1]].clone();
info!("last slot: {}", last_slot);
let working_bank = banks[&last_slot].clone();
Ok(BankForks {
banks,
working_bank,
Expand Down
6 changes: 5 additions & 1 deletion core/src/fullnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Fullnode {
);
let blocktree = Arc::new(blocktree);

let (poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal(
let (mut poh_recorder, entry_receiver) = PohRecorder::new_with_clear_signal(
bank.tick_height(),
bank.last_blockhash(),
bank.slot(),
Expand All @@ -127,6 +127,10 @@ impl Fullnode {
blocktree.new_blobs_signals.first().cloned(),
&leader_schedule_cache,
);
if config.use_snapshot {
poh_recorder.set_bank(&bank);
}

let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_service = PohService::new(poh_recorder.clone(), &config.tick_config, &exit);
assert_eq!(
Expand Down
14 changes: 10 additions & 4 deletions runtime/src/accounts.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::accounts_db::{
get_paths_vec, AccountInfo, AccountStorage, AccountsDB, ErrorCounters, InstructionAccounts,
InstructionLoaders,
get_paths_vec, AccountInfo, AccountStorage, AccountsDB, AppendVecId, ErrorCounters,
InstructionAccounts, InstructionLoaders,
};
use crate::accounts_index::{AccountsIndex, Fork};
use crate::append_vec::StoredAccount;
Expand Down Expand Up @@ -44,6 +44,7 @@ pub struct Accounts {
pub accounts_db: Arc<AccountsDB>,

/// set of accounts which are currently in the pipeline
#[serde(skip)]
account_locks: Mutex<AccountLocks>,

/// List of persistent stores
Expand Down Expand Up @@ -300,7 +301,9 @@ impl Accounts {
pub fn load_by_program(&self, fork: Fork, program_id: &Pubkey) -> Vec<(Pubkey, Account)> {
let accumulator: Vec<Vec<(Pubkey, u64, Account)>> = self.accounts_db.scan_account_storage(
fork,
|stored_account: &StoredAccount, accum: &mut Vec<(Pubkey, u64, Account)>| {
|stored_account: &StoredAccount,
_id: AppendVecId,
accum: &mut Vec<(Pubkey, u64, Account)>| {
if stored_account.balance.owner == *program_id {
let val = (
stored_account.meta.pubkey,
Expand Down Expand Up @@ -383,7 +386,9 @@ impl Accounts {
pub fn hash_internal_state(&self, fork_id: Fork) -> Option<Hash> {
let accumulator: Vec<Vec<(Pubkey, u64, Hash)>> = self.accounts_db.scan_account_storage(
fork_id,
|stored_account: &StoredAccount, accum: &mut Vec<(Pubkey, u64, Hash)>| {
|stored_account: &StoredAccount,
_id: AppendVecId,
accum: &mut Vec<(Pubkey, u64, Hash)>| {
accum.push((
stored_account.meta.pubkey,
stored_account.meta.write_version,
Expand Down Expand Up @@ -1047,6 +1052,7 @@ mod tests {
let mut pubkeys: Vec<Pubkey> = vec![];
create_accounts(&accounts, &mut pubkeys, 100);
check_accounts(&accounts, &pubkeys, 100);
accounts.add_root(0);

let mut buf = vec![0u8; serialized_size(&accounts).unwrap() as usize];
let mut writer = Cursor::new(&mut buf[..]);
Expand Down
188 changes: 168 additions & 20 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
use crate::accounts_index::{AccountsIndex, Fork};
use crate::append_vec::{AppendVec, StorageMeta, StoredAccount};
use crate::serde_utils::{deserialize_atomicusize, serialize_atomicusize};
use bincode::{deserialize_from, serialize_into, serialized_size};
use log::*;
use rand::{thread_rng, Rng};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use solana_sdk::account::Account;
use solana_sdk::pubkey::Pubkey;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::fs::{create_dir_all, remove_dir_all};
use std::io::Cursor;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -63,7 +66,7 @@ pub struct AccountInfo {
lamports: u64,
}
/// An offset into the AccountsDB::storage vector
type AppendVecId = usize;
pub type AppendVecId = usize;
pub type AccountStorage = HashMap<usize, Arc<AccountStorageEntry>>;
pub type InstructionAccounts = Vec<Account>;
pub type InstructionLoaders = Vec<Vec<(Pubkey, Account)>>;
Expand Down Expand Up @@ -98,13 +101,11 @@ pub struct AccountStorageEntry {
/// Keeps track of the number of accounts stored in a specific AppendVec.
/// This is periodically checked to reuse the stores that do not have
/// any accounts in it.
#[serde(serialize_with = "serialize_atomicusize")]
#[serde(deserialize_with = "deserialize_atomicusize")]
#[serde(skip)]
count: AtomicUsize,

/// status corresponding to the storage
#[serde(serialize_with = "serialize_atomicusize")]
#[serde(deserialize_with = "deserialize_atomicusize")]
#[serde(skip)]
status: AtomicUsize,
}

Expand Down Expand Up @@ -146,7 +147,7 @@ impl AccountStorageEntry {
}

// This structure handles the load/store of the accounts
#[derive(Default, Debug, Serialize, Deserialize)]
#[derive(Default, Debug)]
pub struct AccountsDB {
/// Keeps tracks of index into AppendVec on a per fork basis
pub accounts_index: RwLock<AccountsIndex<AccountInfo>>,
Expand All @@ -155,13 +156,9 @@ pub struct AccountsDB {
pub storage: RwLock<AccountStorage>,

/// distribute the accounts across storage lists
#[serde(serialize_with = "serialize_atomicusize")]
#[serde(deserialize_with = "deserialize_atomicusize")]
next_id: AtomicUsize,

/// write version
#[serde(serialize_with = "serialize_atomicusize")]
#[serde(deserialize_with = "deserialize_atomicusize")]
write_version: AtomicUsize,

/// Set of storage paths to pick from
Expand Down Expand Up @@ -214,7 +211,7 @@ impl AccountsDB {
// PERF: Sequentially read each storage entry in parallel
pub fn scan_account_storage<F, B>(&self, fork_id: Fork, scan_func: F) -> Vec<B>
where
F: Fn(&StoredAccount, &mut B) -> (),
F: Fn(&StoredAccount, AppendVecId, &mut B) -> (),
F: Send + Sync,
B: Send + Default,
{
Expand All @@ -233,7 +230,7 @@ impl AccountsDB {
let mut retval = B::default();
accounts
.iter()
.for_each(|stored_account| scan_func(stored_account, &mut retval));
.for_each(|stored_account| scan_func(stored_account, storage.id, &mut retval));
retval
})
.collect()
Expand Down Expand Up @@ -409,6 +406,136 @@ impl AccountsDB {
pub fn add_root(&self, fork: Fork) {
self.accounts_index.write().unwrap().add_root(fork)
}

fn merge(
dest: &mut HashMap<Pubkey, (u64, AccountInfo)>,
source: &HashMap<Pubkey, (u64, AccountInfo)>,
) {
for (key, (source_version, source_info)) in source.iter() {
if let Some((dest_version, _)) = dest.get(key) {
if dest_version > source_version {
continue;
}
}
dest.insert(*key, (*source_version, source_info.clone()));
}
}

pub fn generate_index(&mut self) {
let forks: HashSet<Fork> = self
.storage
.read()
.unwrap()
.values()
.map(|x| x.fork_id)
.collect();

for fork_id in forks.iter() {
let mut accumulator: Vec<HashMap<Pubkey, (u64, AccountInfo)>> = self
.scan_account_storage(
*fork_id,
|stored_account: &StoredAccount,
id: AppendVecId,
accum: &mut HashMap<Pubkey, (u64, AccountInfo)>| {
let account_info = AccountInfo {
id,
offset: stored_account.offset,
lamports: stored_account.balance.lamports,
};
accum.insert(
stored_account.meta.pubkey,
(stored_account.meta.write_version, account_info),
);
},
);

let mut account_maps = accumulator.pop().unwrap();
while let Some(maps) = accumulator.pop() {
AccountsDB::merge(&mut account_maps, &maps);
}
let storage = self.fork_storage(*fork_id);
let mut accounts_index = self.accounts_index.write().unwrap();
for (pubkey, (_, account_info)) in account_maps.iter() {
accounts_index.insert(*fork_id, pubkey, account_info.clone());
storage.add_account();
}
}
}
}

impl Serialize for AccountsDB {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
use serde::ser::Error;
let len = serialized_size(&self.accounts_index).unwrap()
+ serialized_size(&self.paths).unwrap()
+ serialized_size(&self.storage).unwrap()
+ std::mem::size_of::<u64>() as u64
+ serialized_size(&self.file_size).unwrap();
let mut buf = vec![0u8; len as usize];
let mut wr = Cursor::new(&mut buf[..]);
serialize_into(&mut wr, &self.accounts_index).map_err(Error::custom)?;
serialize_into(&mut wr, &self.paths).map_err(Error::custom)?;
serialize_into(&mut wr, &self.storage).map_err(Error::custom)?;
serialize_into(
&mut wr,
&(self.write_version.load(Ordering::Relaxed) as u64),
)
.map_err(Error::custom)?;
serialize_into(&mut wr, &self.file_size).map_err(Error::custom)?;
let len = wr.position() as usize;
serializer.serialize_bytes(&wr.into_inner()[..len])
}
}

struct AccountsDBVisitor;

impl<'a> serde::de::Visitor<'a> for AccountsDBVisitor {
type Value = AccountsDB;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("Expecting AccountsDB")
}

fn visit_bytes<E>(self, data: &[u8]) -> std::result::Result<Self::Value, E>
where
E: serde::de::Error,
{
use serde::de::Error;
let mut rd = Cursor::new(&data[..]);
let accounts_index: RwLock<AccountsIndex<AccountInfo>> =
deserialize_from(&mut rd).map_err(Error::custom)?;
let paths: Vec<String> = deserialize_from(&mut rd).map_err(Error::custom)?;
let storage: RwLock<AccountStorage> = deserialize_from(&mut rd).map_err(Error::custom)?;
let write_version: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;
let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;

let mut ids: Vec<usize> = storage.read().unwrap().keys().cloned().collect();
ids.sort();

let mut accounts_db = AccountsDB {
accounts_index,
storage,
next_id: AtomicUsize::new(ids[ids.len() - 1] + 1),
write_version: AtomicUsize::new(write_version as usize),
paths,
file_size,
};
accounts_db.generate_index();

Ok(accounts_db)
}
}

impl<'de> Deserialize<'de> for AccountsDB {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: ::serde::Deserializer<'de>,
{
deserializer.deserialize_bytes(AccountsDBVisitor)
}
}

#[cfg(test)]
Expand All @@ -418,7 +545,6 @@ mod tests {
use bincode::{deserialize_from, serialize_into, serialized_size};
use rand::{thread_rng, Rng};
use solana_sdk::account::Account;
use std::io::Cursor;

fn cleanup_paths(paths: &str) {
let paths = get_paths_vec(&paths);
Expand Down Expand Up @@ -704,16 +830,35 @@ mod tests {
stores[&0].count.load(Ordering::Relaxed) == count
}

fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec<Pubkey>, fork: Fork, num: usize) {
fn check_accounts(
accounts: &AccountsDB,
pubkeys: &Vec<Pubkey>,
fork: Fork,
num: usize,
count: usize,
) {
for _ in 1..num {
let idx = thread_rng().gen_range(0, num - 1);
let ancestors = vec![(fork, 0)].into_iter().collect();
let account = accounts.load_slow(&ancestors, &pubkeys[idx]).unwrap();
let account1 = Account::new((idx + 1) as u64, 0, &Account::default().owner);
let account1 = Account::new((idx + count) as u64, 0, &Account::default().owner);
assert_eq!(account, account1);
}
}

fn modify_accounts(
accounts: &AccountsDB,
pubkeys: &Vec<Pubkey>,
fork: Fork,
num: usize,
count: usize,
) {
for idx in 0..num {
let account = Account::new((idx + count) as u64, 0, &Account::default().owner);
accounts.store(fork, &[(&pubkeys[idx], &account)]);
}
}

#[test]
fn test_account_one() {
let paths = get_tmp_accounts_path!();
Expand All @@ -733,7 +878,7 @@ mod tests {
let accounts = AccountsDB::new(&paths.paths);
let mut pubkeys: Vec<Pubkey> = vec![];
create_account(&accounts, &mut pubkeys, 0, 100, 0, 0);
check_accounts(&accounts, &pubkeys, 0, 100);
check_accounts(&accounts, &pubkeys, 0, 100, 1);
}

#[test]
Expand Down Expand Up @@ -897,7 +1042,10 @@ mod tests {
let mut pubkeys: Vec<Pubkey> = vec![];
create_account(&accounts, &mut pubkeys, 0, 100, 0, 0);
assert_eq!(check_storage(&accounts, 100), true);
check_accounts(&accounts, &pubkeys, 0, 100);
check_accounts(&accounts, &pubkeys, 0, 100, 1);
modify_accounts(&accounts, &pubkeys, 0, 100, 2);
check_accounts(&accounts, &pubkeys, 0, 100, 2);
accounts.add_root(0);

let mut pubkeys1: Vec<Pubkey> = vec![];
create_account(&accounts, &mut pubkeys1, 1, 10, 0, 0);
Expand All @@ -908,7 +1056,7 @@ mod tests {

let mut reader = Cursor::new(&mut buf[..]);
let daccounts: AccountsDB = deserialize_from(&mut reader).unwrap();
check_accounts(&daccounts, &pubkeys, 0, 100);
check_accounts(&daccounts, &pubkeys1, 1, 10);
check_accounts(&daccounts, &pubkeys, 0, 100, 2);
check_accounts(&daccounts, &pubkeys1, 1, 10, 1);
}
}
Loading

0 comments on commit 003a4a0

Please sign in to comment.