Skip to content

Commit

Permalink
improve multi executor cache addition (#22381)
Browse files Browse the repository at this point in the history
(cherry picked from commit 4a9f4e2)

Co-authored-by: Jack May <[email protected]>
  • Loading branch information
mergify[bot] and jackcmay authored Jan 8, 2022
1 parent 8a470d3 commit fee8672
Showing 1 changed file with 54 additions and 36 deletions.
90 changes: 54 additions & 36 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,23 @@ impl CachedExecutors {
})
}

fn put(&mut self, pubkey: &Pubkey, executor: Arc<dyn Executor>) {
let entry = if let Some(mut entry) = self.executors.remove(pubkey) {
saturating_add_assign!(self.stats.hits, 1);
entry.executor = executor;
entry
} else {
saturating_add_assign!(self.stats.misses, 1);
fn put(&mut self, executors: &[(&Pubkey, Arc<dyn Executor>)]) {
let mut new_executors: Vec<_> = executors
.iter()
.filter_map(|(key, executor)| {
if let Some(mut entry) = self.executors.remove(key) {
saturating_add_assign!(self.stats.hits, 1);
entry.executor = executor.clone();
let _ = self.executors.insert(**key, entry);
None
} else {
saturating_add_assign!(self.stats.misses, 1);
Some((*key, executor))
}
})
.collect();

if !new_executors.is_empty() {
let mut counts = self
.executors
.iter()
Expand All @@ -460,10 +470,15 @@ impl CachedExecutors {
.collect::<Vec<_>>();
counts.sort_unstable_by_key(|(_, count)| *count);

let primer_count = Self::get_primer_count(counts.as_slice());
let primer_counts = Self::get_primer_counts(counts.as_slice(), new_executors.len());

if self.executors.len() >= self.max {
if let Some(least_key) = counts.first().map(|least| *least.0) {
let mut least_keys = counts
.iter()
.take(new_executors.len())
.map(|least| *least.0)
.collect::<Vec<_>>();
for least_key in least_keys.drain(..) {
let _ = self.executors.remove(&least_key);
self.stats
.evictions
Expand All @@ -473,13 +488,15 @@ impl CachedExecutors {
}
}

CachedExecutorsEntry {
prev_epoch_count: 0,
epoch_count: AtomicU64::new(primer_count),
executor,
for ((key, executor), primer_count) in new_executors.drain(..).zip(primer_counts) {
let entry = CachedExecutorsEntry {
prev_epoch_count: 0,
epoch_count: AtomicU64::new(primer_count),
executor: executor.clone(),
};
let _ = self.executors.insert(*key, entry);
}
};
let _ = self.executors.insert(*pubkey, entry);
}
}

fn remove(&mut self, pubkey: &Pubkey) {
Expand Down Expand Up @@ -512,10 +529,13 @@ impl CachedExecutors {
.unwrap_or(0)
}

fn get_primer_count(counts: &[(&Pubkey, u64)]) -> u64 {
fn get_primer_counts(counts: &[(&Pubkey, u64)], num_counts: usize) -> Vec<u64> {
let max_primer_count = Self::get_primer_count_upper_bound_inclusive(counts);
let mut rng = rand::thread_rng();
rng.gen_range(0, max_primer_count.saturating_add(1))

(0..num_counts)
.map(|_| rng.gen_range(0, max_primer_count.saturating_add(1)))
.collect::<Vec<_>>()
}
}

Expand Down Expand Up @@ -3425,9 +3445,7 @@ impl Bank {
if !dirty_executors.is_empty() {
let mut cache = self.cached_executors.write().unwrap();
let cache = Arc::make_mut(&mut cache);
for (key, executor) in dirty_executors.into_iter() {
cache.put(key, executor);
}
cache.put(&dirty_executors);
}
}

Expand Down Expand Up @@ -11924,17 +11942,17 @@ pub(crate) mod tests {
let executor: Arc<dyn Executor> = Arc::new(TestExecutor {});
let mut cache = CachedExecutors::new(3, 0);

cache.put(&key1, executor.clone());
cache.put(&key2, executor.clone());
cache.put(&key3, executor.clone());
cache.put(&[(&key1, executor.clone())]);
cache.put(&[(&key2, executor.clone())]);
cache.put(&[(&key3, executor.clone())]);
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key3).is_some());

assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_some());
cache.put(&key4, executor.clone());
cache.put(&[(&key4, executor.clone())]);
assert!(cache.get(&key4).is_some());
let num_retained = [&key1, &key2, &key3]
.iter()
Expand All @@ -11946,7 +11964,7 @@ pub(crate) mod tests {
assert!(cache.get(&key4).is_some());
assert!(cache.get(&key4).is_some());
assert!(cache.get(&key4).is_some());
cache.put(&key3, executor.clone());
cache.put(&[(&key3, executor.clone())]);
assert!(cache.get(&key3).is_some());
let num_retained = [&key1, &key2, &key4]
.iter()
Expand All @@ -11957,7 +11975,7 @@ pub(crate) mod tests {
}

#[test]
fn test_cached_executors_eviction() {
fn test_cached_executor_eviction() {
let key1 = solana_sdk::pubkey::new_rand();
let key2 = solana_sdk::pubkey::new_rand();
let key3 = solana_sdk::pubkey::new_rand();
Expand All @@ -11966,9 +11984,9 @@ pub(crate) mod tests {
let mut cache = CachedExecutors::new(3, 0);
assert!(cache.current_epoch == 0);

cache.put(&key1, executor.clone());
cache.put(&key2, executor.clone());
cache.put(&key3, executor.clone());
cache.put(&[(&key1, executor.clone())]);
cache.put(&[(&key2, executor.clone())]);
cache.put(&[(&key3, executor.clone())]);
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());
Expand All @@ -11979,7 +11997,7 @@ pub(crate) mod tests {
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key3).is_some());
Arc::make_mut(&mut cache).put(&key4, executor.clone());
Arc::make_mut(&mut cache).put(&[(&key4, executor.clone())]);

assert!(cache.get(&key4).is_some());
let num_retained = [&key1, &key2, &key3]
Expand All @@ -11989,8 +12007,8 @@ pub(crate) mod tests {
.count();
assert_eq!(num_retained, 2);

Arc::make_mut(&mut cache).put(&key1, executor.clone());
Arc::make_mut(&mut cache).put(&key3, executor.clone());
Arc::make_mut(&mut cache).put(&[(&key1, executor.clone())]);
Arc::make_mut(&mut cache).put(&[(&key3, executor.clone())]);
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key3).is_some());
let num_retained = [&key2, &key4]
Expand All @@ -12003,7 +12021,7 @@ pub(crate) mod tests {
cache = cache.clone_with_epoch(2);
assert!(cache.current_epoch == 2);

Arc::make_mut(&mut cache).put(&key3, executor.clone());
Arc::make_mut(&mut cache).put(&[(&key3, executor.clone())]);
assert!(cache.get(&key3).is_some());
}

Expand All @@ -12015,11 +12033,11 @@ pub(crate) mod tests {
let executor: Arc<dyn Executor> = Arc::new(TestExecutor {});
let mut cache = CachedExecutors::new(2, 0);

cache.put(&key1, executor.clone());
cache.put(&[(&key1, executor.clone())]);
for _ in 0..5 {
let _ = cache.get(&key1);
}
cache.put(&key2, executor.clone());
cache.put(&[(&key2, executor.clone())]);
// make key1's use-count for sure greater than key2's
let _ = cache.get(&key1);

Expand All @@ -12031,7 +12049,7 @@ pub(crate) mod tests {
entries.sort_by_key(|(_, v)| *v);
assert!(entries[0].1 < entries[1].1);

cache.put(&key3, executor.clone());
cache.put(&[(&key3, executor.clone())]);
assert!(cache.get(&entries[0].0).is_none());
assert!(cache.get(&entries[1].0).is_some());
}
Expand Down

0 comments on commit fee8672

Please sign in to comment.