Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin committed Oct 14, 2020
1 parent dc61337 commit 256a009
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 76 deletions.
1 change: 0 additions & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,6 @@ fn report_target_features() {
// Validator binaries built on a machine with AVX support will generate invalid opcodes
// when run on machines without AVX causing a non-obvious process abort. Instead detect
// the mismatch and error cleanly.
#[target_feature(enable = "avx")]
{
if is_x86_feature_detected!("avx") {
info!("AVX detected");
Expand Down
84 changes: 30 additions & 54 deletions runtime/benches/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,15 @@ fn bench_delete_dependencies(bencher: &mut Bencher) {
});
}

#[bench]
#[ignore]
fn bench_concurrent_read_write(bencher: &mut Bencher) {
fn store_accounts_with_possible_contention<F: 'static>(bench_name: &str, bencher: &mut Bencher, reader_f: F)
where
F: Fn(&Accounts, &[Pubkey]) -> () + Send + Copy,
{
let num_readers = 5;
let accounts = Arc::new(Accounts::new(
vec![
PathBuf::from(std::env::var("FARF_DIR").unwrap_or_else(|_| "farf".to_string()))
.join("concurrent_read_write"),
.join(bench_name),
],
&ClusterType::Development,
));
Expand All @@ -174,11 +175,7 @@ fn bench_concurrent_read_write(bencher: &mut Bencher) {
Builder::new()
.name("readers".to_string())
.spawn(move || {
let mut rng = rand::thread_rng();
loop {
let i = rng.gen_range(0, num_keys);
test::black_box(accounts.load_slow(&HashMap::new(), &pubkeys[i]).unwrap());
}
reader_f(&accounts, &pubkeys);
})
.unwrap();
}
Expand All @@ -198,50 +195,29 @@ fn bench_concurrent_read_write(bencher: &mut Bencher) {
}

#[bench]
fn bench_concurrent_scan_write(bencher: &mut Bencher) {
let num_readers = 5;
let accounts = Arc::new(Accounts::new(
vec![PathBuf::from("concurrent_read_write")],
&ClusterType::Development,
));
let num_keys = 1000;
let slot = 0;
accounts.add_root(slot);
let default_owner = Account::default().owner;
let pubkeys: Arc<Vec<_>> = Arc::new(
(0..num_keys)
.map(|_| {
let pubkey = Pubkey::new_rand();
let account = Account::new(1, 0, &Account::default().owner);
accounts.store_slow(slot, &pubkey, &account);
pubkey
})
.collect(),
);

for _ in 0..num_readers {
let accounts = accounts.clone();
let pubkeys = pubkeys.clone();
Builder::new()
.name("readers".to_string())
.spawn(move || {
loop {
accounts.load_by_program(&HashMap::new(), &default_owner);
}
})
.unwrap();
}
#[ignore]
fn bench_concurrent_read_write(bencher: &mut Bencher) {
store_accounts_with_possible_contention(
"concurrent_read_write",
bencher,
|accounts, pubkeys| {
let mut rng = rand::thread_rng();
loop {
let i = rng.gen_range(0, pubkeys.len());
test::black_box(accounts.load_slow(&HashMap::new(), &pubkeys[i]).unwrap());
}
},
)
}

let num_new_keys = 1000;
let new_accounts: Vec<_> = (0..num_new_keys)
.map(|_| Account::new(1, 0, &Account::default().owner))
.collect();
bencher.iter(|| {
for account in &new_accounts {
// Write to a different slot than the one being read from. Because
// there's a new account pubkey being written to every time, will
// compete for the accounts index lock on every store
accounts.store_slow(slot + 1, &Pubkey::new_rand(), &account);
}
})
#[bench]
#[ignore]
fn bench_concurrent_scan_write(bencher: &mut Bencher) {
store_accounts_with_possible_contention(
"concurrent_scan_write",
bencher,
|accounts, pubkeys| loop {
accounts.load_by_program(&HashMap::new(), &Account::default().owner);
},
)
}
2 changes: 0 additions & 2 deletions runtime/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,6 @@ impl Accounts {
rent_collector: &RentCollector,
feature_set: &FeatureSet,
) -> Vec<(Result<TransactionLoadResult>, Option<HashAgeKind>)> {
//PERF: hold the lock to scan for the references, but not to clone the accounts
//TODO: two locks usually leads to deadlocks, should this be one structure?
let accounts_index = &self.accounts_db.accounts_index;

let fee_config = FeeConfig {
Expand Down
50 changes: 31 additions & 19 deletions runtime/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::ops::{
use std::sync::atomic::{AtomicU64, Ordering};
use std::{
collections::{BTreeMap, HashMap, HashSet},
ops::RangeBounds,
sync::{Arc, RwLock, RwLockReadGuard},
ops::{Range, RangeBounds},
sync::{Arc, RwLock},
};

const ITER_BATCH_SIZE: usize = 1000;
Expand Down Expand Up @@ -43,7 +43,6 @@ pub type SlotSlice<'s, T> = &'s [(Slot, T)];
pub type Ancestors = HashMap<Slot, usize>;

pub type RefCount = u64;
type AccountMapEntry<T> = (AtomicU64, RwLock<SlotList<T>>);
pub type AccountMap<K, V> = BTreeMap<K, V>;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -137,6 +136,7 @@ pub struct AccountsIndexIterator<'a, T> {
account_maps: &'a RwLock<AccountMap<Pubkey, AccountMapEntry<T>>>,
start_bound: Bound<Pubkey>,
end_bound: Bound<Pubkey>,
is_finished: bool,
}

impl<'a, T> AccountsIndexIterator<'a, T> {
Expand Down Expand Up @@ -165,25 +165,29 @@ impl<'a, T> AccountsIndexIterator<'a, T> {
.map(|r| Self::clone_bound(r.end_bound()))
.unwrap_or(Unbounded),
account_maps,
is_finished: false,
}
}
}

impl<'a, T: 'static + Clone> Iterator for AccountsIndexIterator<'a, T> {
type Item = Vec<(Pubkey, AccountMapEntry<T>)>;
fn next(&mut self) -> Option<Self::Item> {
if self.is_finished {
return None;
}

let chunk: Vec<(Pubkey, AccountMapEntry<T>)> = self
.account_maps
.read()
.unwrap()
.range((self.start_bound, self.end_bound))
.map(|(pubkey, account_map_entry)| {
(*pubkey, account_map_entry.clone())
})
.map(|(pubkey, account_map_entry)| (*pubkey, account_map_entry.clone()))
.take(ITER_BATCH_SIZE)
.collect();

if chunk.is_empty() {
self.is_finished = true;
return None;
}

Expand Down Expand Up @@ -281,7 +285,7 @@ impl<T: 'static + Clone> AccountsIndex<T> {
where
F: FnMut(&Pubkey, (&T, Slot)),
{
self.do_scan_accounts(ancestors, func, Some(Pubkey::default()..));
self.do_scan_accounts(ancestors, func, None::<Range<Pubkey>>);
}

/// call func with every pubkey and index visible from a given set of ancestors with range
Expand Down Expand Up @@ -330,8 +334,8 @@ impl<T: 'static + Clone> AccountsIndex<T> {
.filter(|(slot, _)| slots.contains(&slot))
.cloned()
.collect();
list.retain(|(slot, _)| !slots.contains(slot));
(reclaims, list.is_empty())
slot_list.retain(|(slot, _)| !slots.contains(slot));
(reclaims, slot_list.is_empty())
}

// Given a SlotSlice `L`, a list of ancestors and a maximum slot, find the latest element
Expand Down Expand Up @@ -418,16 +422,14 @@ impl<T: 'static + Clone> AccountsIndex<T> {
}

pub fn unref_from_storage(&self, pubkey: &Pubkey) {
let locked_entry = self.account_maps.get(pubkey);
if let Some(entry) = locked_entry {
entry.0.fetch_sub(1, Ordering::Relaxed);
if let Some(locked_entry) = self.get_account_read_entry(pubkey) {
locked_entry.ref_count.fetch_sub(1, Ordering::Relaxed);
}
}

pub fn ref_count_from_storage(&self, pubkey: &Pubkey) -> RefCount {
let locked_entry = self.account_maps.get(pubkey);
if let Some(entry) = locked_entry {
entry.0.load(Ordering::Relaxed)
if let Some(locked_entry) = self.get_account_read_entry(pubkey) {
locked_entry.ref_count.load(Ordering::Relaxed)
} else {
0
}
Expand All @@ -439,9 +441,9 @@ impl<T: 'static + Clone> AccountsIndex<T> {
reclaims: &mut SlotList<T>,
max_clean_root: Option<Slot>,
) {
let roots = &self.roots;
let roots_traker = &self.roots_tracker.read().unwrap();

let max_root = Self::get_max_root(roots, &list, max_clean_root);
let max_root = Self::get_max_root(&roots_traker.roots, &list, max_clean_root);

reclaims.extend(
list.iter()
Expand Down Expand Up @@ -485,7 +487,7 @@ impl<T: 'static + Clone> AccountsIndex<T> {
}

pub fn is_root(&self, slot: Slot) -> bool {
self.roots.contains(&slot)
self.roots_tracker.read().unwrap().roots.contains(&slot)
}

pub fn add_root(&self, slot: Slot) {
Expand Down Expand Up @@ -515,7 +517,7 @@ impl<T: 'static + Clone> AccountsIndex<T> {
// Only keep the slots that have yet to be cleaned
!is_cleaned
});
std::mem::replace(&mut self.previous_uncleaned_roots, cleaned_roots)
std::mem::replace(&mut w_roots_tracker.previous_uncleaned_roots, cleaned_roots)
}

pub fn is_uncleaned_root(&self, slot: Slot) -> bool {
Expand Down Expand Up @@ -752,6 +754,16 @@ mod tests {
run_test_scan_accounts(ITER_BATCH_SIZE * 10 + 1);
}

#[test]
fn test_accounts_iter_finished() {
let (index, _) = setup_accounts_index_keys(0);
let mut iter = index.iter(None::<Range<Pubkey>>);
assert!(iter.next().is_none());
let mut gc = vec![];
index.update_or_create_if_missing(0, &Pubkey::new_rand(), true, &mut gc);
assert!(iter.next().is_none());
}

#[test]
fn test_is_root() {
let index = AccountsIndex::<bool>::default();
Expand Down

0 comments on commit 256a009

Please sign in to comment.