Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
bank: prime new executor cache entry use-counts (#22374)
Browse files Browse the repository at this point in the history
Co-authored-by: Trent Nelson <[email protected]>
  • Loading branch information
mergify[bot] and t-nelson authored Jan 8, 2022
1 parent 7cbfab0 commit 8c6df1f
Showing 1 changed file with 129 additions and 28 deletions.
157 changes: 129 additions & 28 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ use {
dashmap::DashMap,
itertools::Itertools,
log::*,
rand::Rng,
rayon::{
iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator},
ThreadPool, ThreadPoolBuilder,
Expand Down Expand Up @@ -132,7 +133,7 @@ use {
collections::{HashMap, HashSet},
convert::{TryFrom, TryInto},
fmt, mem,
ops::RangeInclusive,
ops::{Div, RangeInclusive},
path::PathBuf,
ptr,
rc::Rc,
Expand Down Expand Up @@ -360,7 +361,7 @@ struct CachedExecutorsEntry {
struct CachedExecutors {
max: usize,
current_epoch: Epoch,
executors: HashMap<Pubkey, CachedExecutorsEntry>,
pub(self) executors: HashMap<Pubkey, CachedExecutorsEntry>,
stats: executor_cache::Stats,
}
impl Default for CachedExecutors {
Expand Down Expand Up @@ -449,29 +450,32 @@ impl CachedExecutors {
entry
} else {
saturating_add_assign!(self.stats.misses, 1);
if self.executors.len() >= self.max {
let mut least = u64::MAX;
let default_key = Pubkey::default();
let mut least_key = &default_key;

for (key, entry) in self.executors.iter() {
let mut counts = self
.executors
.iter()
.map(|(key, entry)| {
let count = entry.prev_epoch_count + entry.epoch_count.load(Relaxed);
if count < least {
least = count;
least_key = key;
}
(key, count)
})
.collect::<Vec<_>>();
counts.sort_unstable_by_key(|(_, count)| *count);

let primer_count = Self::get_primer_count(counts.as_slice());

if self.executors.len() >= self.max {
if let Some(least_key) = counts.first().map(|least| *least.0) {
let _ = self.executors.remove(&least_key);
self.stats
.evictions
.entry(least_key)
.and_modify(|c| saturating_add_assign!(*c, 1))
.or_insert(1);
}
let least_key = *least_key;
let _ = self.executors.remove(&least_key);
self.stats
.evictions
.entry(least_key)
.and_modify(|c| saturating_add_assign!(*c, 1))
.or_insert(1);
}

CachedExecutorsEntry {
prev_epoch_count: 0,
epoch_count: AtomicU64::new(0),
epoch_count: AtomicU64::new(primer_count),
executor,
}
};
Expand All @@ -481,6 +485,38 @@ impl CachedExecutors {
fn remove(&mut self, pubkey: &Pubkey) {
let _ = self.executors.remove(pubkey);
}

fn get_primer_count_upper_bound_inclusive(counts: &[(&Pubkey, u64)]) -> u64 {
const PRIMER_COUNT_TARGET_PERCENTILE: u64 = 85;
#[allow(clippy::assertions_on_constants)]
{
assert!(PRIMER_COUNT_TARGET_PERCENTILE <= 100);
}
// Executor use-frequencies are assumed to fit a Pareto distribution. Choose an
// upper-bound for our primer count as the actual count at the target rank to avoid
// an upward bias

let target_index = u64::try_from(counts.len().saturating_sub(1))
.ok()
.and_then(|counts| {
let index = counts
.saturating_mul(PRIMER_COUNT_TARGET_PERCENTILE)
.div(100); // switch to u64::saturating_div once stable
usize::try_from(index).ok()
})
.unwrap_or(0);

counts
.get(target_index)
.map(|(_, count)| *count)
.unwrap_or(0)
}

fn get_primer_count(counts: &[(&Pubkey, u64)]) -> u64 {
let max_primer_count = Self::get_primer_count_upper_bound_inclusive(counts);
let mut rng = rand::thread_rng();
rng.gen_range(0, max_primer_count.saturating_add(1))
}
}

pub struct TxComputeMeter {
Expand Down Expand Up @@ -11899,19 +11935,25 @@ pub(crate) mod tests {
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_some());
cache.put(&key4, executor.clone());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key3).is_none());
assert!(cache.get(&key4).is_some());
let num_retained = [&key1, &key2, &key3]
.iter()
.map(|key| cache.get(key))
.flatten()
.count();
assert_eq!(num_retained, 2);

assert!(cache.get(&key4).is_some());
assert!(cache.get(&key4).is_some());
assert!(cache.get(&key4).is_some());
cache.put(&key3, executor.clone());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_none());
assert!(cache.get(&key3).is_some());
assert!(cache.get(&key4).is_some());
let num_retained = [&key1, &key2, &key4]
.iter()
.map(|key| cache.get(key))
.flatten()
.count();
assert_eq!(num_retained, 2);
}

#[test]
Expand Down Expand Up @@ -11940,12 +11982,23 @@ pub(crate) mod tests {
Arc::make_mut(&mut cache).put(&key4, executor.clone());

assert!(cache.get(&key4).is_some());
assert!(cache.get(&key3).is_none());
let num_retained = [&key1, &key2, &key3]
.iter()
.map(|key| cache.get(key))
.flatten()
.count();
assert_eq!(num_retained, 2);

Arc::make_mut(&mut cache).put(&key1, executor.clone());
Arc::make_mut(&mut cache).put(&key3, executor.clone());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key4).is_none());
assert!(cache.get(&key3).is_some());
let num_retained = [&key2, &key4]
.iter()
.map(|key| cache.get(key))
.flatten()
.count();
assert_eq!(num_retained, 1);

cache = cache.clone_with_epoch(2);
assert!(cache.current_epoch == 2);
Expand All @@ -11954,6 +12007,35 @@ pub(crate) mod tests {
assert!(cache.get(&key3).is_some());
}

#[test]
fn test_cached_executors_evicts_smallest() {
let key1 = solana_sdk::pubkey::new_rand();
let key2 = solana_sdk::pubkey::new_rand();
let key3 = solana_sdk::pubkey::new_rand();
let executor: Arc<dyn Executor> = Arc::new(TestExecutor {});
let mut cache = CachedExecutors::new(2, 0);

cache.put(&key1, executor.clone());
for _ in 0..5 {
let _ = cache.get(&key1);
}
cache.put(&key2, executor.clone());
// make key1's use-count for sure greater than key2's
let _ = cache.get(&key1);

let mut entries = cache
.executors
.iter()
.map(|(k, v)| (*k, v.epoch_count.load(Relaxed)))
.collect::<Vec<_>>();
entries.sort_by_key(|(_, v)| *v);
assert!(entries[0].1 < entries[1].1);

cache.put(&key3, executor.clone());
assert!(cache.get(&entries[0].0).is_none());
assert!(cache.get(&entries[1].0).is_some());
}

#[test]
fn test_bank_executor_cache() {
solana_logger::setup();
Expand Down Expand Up @@ -14173,4 +14255,23 @@ pub(crate) mod tests {
Some(Vec::<TransactionLogInfo>::new()),
);
}
#[test]
fn test_executor_cache_get_primer_count_upper_bound_inclusive() {
let pubkey = Pubkey::default();
let v = [];
assert_eq!(
CachedExecutors::get_primer_count_upper_bound_inclusive(&v),
0
);
let v = [(&pubkey, 1)];
assert_eq!(
CachedExecutors::get_primer_count_upper_bound_inclusive(&v),
1
);
let v = (0u64..10).map(|i| (&pubkey, i)).collect::<Vec<_>>();
assert_eq!(
CachedExecutors::get_primer_count_upper_bound_inclusive(v.as_slice()),
7
);
}
}

0 comments on commit 8c6df1f

Please sign in to comment.