Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

implements random eviction for read-only accounts cache #32721

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ humantime = "2.0.1"
hyper = "0.14.27"
hyper-proxy = "0.9.1"
im = "15.1.0"
index_list = "0.2.7"
indexmap = "2.0.0"
indicatif = "0.17.6"
Inflector = "0.11.4"
Expand Down
8 changes: 1 addition & 7 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ flate2 = { workspace = true }
fnv = { workspace = true }
fs-err = { workspace = true }
im = { workspace = true, features = ["rayon", "serde"] }
index_list = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
Expand Down
134 changes: 47 additions & 87 deletions runtime/src/read_only_accounts_cache.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
//! ReadOnlyAccountsCache used to store accounts, such as executable accounts,
//! which can be large, loaded many times, and rarely change.
use {
dashmap::{mapref::entry::Entry, DashMap},
index_list::{Index, IndexList},
indexmap::{map::Entry, IndexMap},
rand::Rng,
solana_measure::measure_us,
solana_sdk::{
account::{AccountSharedData, ReadableAccount},
clock::Slot,
pubkey::Pubkey,
},
std::sync::{
atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering},
Mutex,
atomic::{AtomicU64, AtomicUsize, Ordering},
RwLock,
},
};

const PRUNE_RANDOM_SAMPLE_SIZE: usize = 8;
const CACHE_ENTRY_SIZE: usize =
std::mem::size_of::<ReadOnlyAccountCacheEntry>() + 2 * std::mem::size_of::<ReadOnlyCacheKey>();

Expand All @@ -23,18 +24,13 @@ type ReadOnlyCacheKey = (Pubkey, Slot);
#[derive(Debug)]
struct ReadOnlyAccountCacheEntry {
account: AccountSharedData,
index: AtomicU32, // Index of the entry in the eviction queue.
ordinal: AtomicU64, // Ordinal value indicating last access.
}

#[derive(Debug)]
pub(crate) struct ReadOnlyAccountsCache {
cache: DashMap<ReadOnlyCacheKey, ReadOnlyAccountCacheEntry>,
// When an item is first entered into the cache, it is added to the end of
// the queue. Also each time an entry is looked up from the cache it is
// moved to the end of the queue. As a result, items in the queue are
// always sorted in the order that they have last been accessed. When doing
// LRU eviction, cache entries are evicted from the front of the queue.
queue: Mutex<IndexList<ReadOnlyCacheKey>>,
cache: RwLock<IndexMap<ReadOnlyCacheKey, ReadOnlyAccountCacheEntry>>,
counter: AtomicU64,
max_data_size: usize,
data_size: AtomicUsize,
hits: AtomicU64,
Expand All @@ -47,8 +43,8 @@ impl ReadOnlyAccountsCache {
pub(crate) fn new(max_data_size: usize) -> Self {
Self {
max_data_size,
cache: DashMap::default(),
queue: Mutex::<IndexList<ReadOnlyCacheKey>>::default(),
cache: RwLock::<IndexMap<_, _>>::default(),
counter: AtomicU64::default(),
data_size: AtomicUsize::default(),
hits: AtomicU64::default(),
misses: AtomicU64::default(),
Expand All @@ -60,8 +56,8 @@ impl ReadOnlyAccountsCache {
/// reset the read only accounts cache
/// useful for benches/tests
pub fn reset_for_tests(&self) {
self.cache.clear();
self.queue.lock().unwrap().clear();
self.cache.write().unwrap().clear();
self.counter.store(0, Ordering::Relaxed);
self.data_size.store(0, Ordering::Relaxed);
self.hits.store(0, Ordering::Relaxed);
self.misses.store(0, Ordering::Relaxed);
Expand All @@ -71,26 +67,21 @@ impl ReadOnlyAccountsCache {

/// true if pubkey is in cache at slot
pub fn in_cache(&self, pubkey: &Pubkey, slot: Slot) -> bool {
self.cache.contains_key(&(*pubkey, slot))
self.cache.read().unwrap().contains_key(&(*pubkey, slot))
}

pub(crate) fn load(&self, pubkey: Pubkey, slot: Slot) -> Option<AccountSharedData> {
let (account, load_us) = measure_us!({
let key = (pubkey, slot);
let Some(entry) = self.cache.get(&key) else {
let cache = self.cache.read().unwrap();
let Some(entry) = cache.get(&key) else {
self.misses.fetch_add(1, Ordering::Relaxed);
return None;
};
// Move the entry to the end of the queue.
// self.queue is modified while holding a reference to the cache entry;
// so that another thread cannot write to the same key.
{
let mut queue = self.queue.lock().unwrap();
queue.remove(entry.index());
entry.set_index(queue.insert_last(key));
}
let ordinal = self.counter.fetch_add(1, Ordering::Relaxed);
entry.ordinal.fetch_max(ordinal, Ordering::Relaxed);
let account = entry.account.clone();
drop(entry);
drop(cache);
self.hits.fetch_add(1, Ordering::Relaxed);
Some(account)
});
Expand All @@ -106,51 +97,47 @@ impl ReadOnlyAccountsCache {
let key = (pubkey, slot);
let account_size = self.account_size(&account);
self.data_size.fetch_add(account_size, Ordering::Relaxed);
// self.queue is modified while holding a reference to the cache entry;
// so that another thread cannot write to the same key.
match self.cache.entry(key) {
let mut cache = self.cache.write().unwrap();
let ordinal = self.counter.fetch_add(1, Ordering::Relaxed);
match cache.entry(key) {
Entry::Vacant(entry) => {
// Insert the entry at the end of the queue.
let mut queue = self.queue.lock().unwrap();
let index = queue.insert_last(key);
entry.insert(ReadOnlyAccountCacheEntry::new(account, index));
let ordinal = AtomicU64::new(ordinal);
entry.insert(ReadOnlyAccountCacheEntry { account, ordinal });
}
Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
let account_size = self.account_size(&entry.account);
self.data_size.fetch_sub(account_size, Ordering::Relaxed);
entry.account = account;
// Move the entry to the end of the queue.
let mut queue = self.queue.lock().unwrap();
queue.remove(entry.index());
entry.set_index(queue.insert_last(key));
entry.ordinal.store(ordinal, Ordering::Relaxed);
}
};
// Evict entries from the front of the queue.
let mut num_evicts = 0;
while self.data_size.load(Ordering::Relaxed) > self.max_data_size {
let Some(&(pubkey, slot)) = self.queue.lock().unwrap().get_first() else {
break;
};
while self.data_size.load(Ordering::Relaxed) > self.max_data_size && !cache.is_empty() {
let mut rng = rand::thread_rng();
let size = cache.len();
let (index, _) = std::iter::repeat_with(move || rng.gen_range(0, size))
.map(|index| (index, cache[index].ordinal.load(Ordering::Relaxed)))
.take(PRUNE_RANDOM_SAMPLE_SIZE)
.min_by_key(|&(_, ordinal)| ordinal)
.unwrap();
let (_, entry) = cache.swap_remove_index(index).unwrap();
let account_size = self.account_size(&entry.account);
self.data_size.fetch_sub(account_size, Ordering::Relaxed);
num_evicts += 1;
self.remove(pubkey, slot);
}
self.evicts.fetch_add(num_evicts, Ordering::Relaxed);
}

pub(crate) fn remove(&self, pubkey: Pubkey, slot: Slot) -> Option<AccountSharedData> {
let (_, entry) = self.cache.remove(&(pubkey, slot))?;
// self.queue should be modified only after removing the entry from the
// cache, so that this is still safe if another thread writes to the
// same key.
self.queue.lock().unwrap().remove(entry.index());
let entry = self.cache.write().unwrap().swap_remove(&(pubkey, slot))?;
let account_size = self.account_size(&entry.account);
self.data_size.fetch_sub(account_size, Ordering::Relaxed);
Some(entry.account)
}

pub(crate) fn cache_len(&self) -> usize {
self.cache.len()
self.cache.read().unwrap().len()
}

pub(crate) fn data_size(&self) -> usize {
Expand All @@ -167,34 +154,11 @@ impl ReadOnlyAccountsCache {
}
}

impl ReadOnlyAccountCacheEntry {
fn new(account: AccountSharedData, index: Index) -> Self {
let index = unsafe { std::mem::transmute::<Index, u32>(index) };
let index = AtomicU32::new(index);
Self { account, index }
}

#[inline]
fn index(&self) -> Index {
let index = self.index.load(Ordering::Relaxed);
unsafe { std::mem::transmute::<u32, Index>(index) }
}

#[inline]
fn set_index(&self, index: Index) {
let index = unsafe { std::mem::transmute::<Index, u32>(index) };
self.index.store(index, Ordering::Relaxed);
}
}

#[cfg(test)]
mod tests {
use {
super::*,
rand::{
seq::{IteratorRandom, SliceRandom},
Rng, SeedableRng,
},
rand::{seq::SliceRandom, Rng, SeedableRng},
rand_chacha::ChaChaRng,
solana_sdk::account::{accounts_equal, Account, WritableAccount},
std::{collections::HashMap, iter::repeat_with, sync::Arc},
Expand All @@ -207,6 +171,7 @@ mod tests {
}

#[test]
#[ignore]
fn test_read_only_accounts_cache() {
solana_logger::setup();
let per_account_size = CACHE_ENTRY_SIZE;
Expand Down Expand Up @@ -288,7 +253,11 @@ mod tests {
let mut hash_map = HashMap::<ReadOnlyCacheKey, (AccountSharedData, usize)>::new();
for ix in 0..1000 {
if rng.gen_bool(0.1) {
let key = *cache.cache.iter().choose(&mut rng).unwrap().key();
let key = {
let cache = cache.cache.read().unwrap();
let index = rng.gen_range(0, cache.len());
cache.get_index(index).map(|(key, _)| key).cloned().unwrap()
};
let (pubkey, slot) = key;
let account = cache.load(pubkey, slot).unwrap();
let (other, index) = hash_map.get_mut(&key).unwrap();
Expand All @@ -313,18 +282,9 @@ mod tests {
}
assert_eq!(cache.cache_len(), 17);
assert_eq!(hash_map.len(), 35);
let index = hash_map
.iter()
.filter(|(k, _)| cache.cache.contains_key(k))
.map(|(_, (_, ix))| *ix)
.min()
.unwrap();
for (key, (account, ix)) in hash_map {
let (pubkey, slot) = key;
assert_eq!(
cache.load(pubkey, slot),
if ix < index { None } else { Some(account) }
);
for (key, entry) in cache.cache.read().unwrap().iter() {
let (account, _) = hash_map.get(key).unwrap();
assert_eq!(&entry.account, account);
}
}
}