Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

improve multi executor cache addition (backport #22365) #22381

Merged
merged 1 commit into from
Jan 8, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 54 additions & 36 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,23 @@ impl CachedExecutors {
})
}

fn put(&mut self, pubkey: &Pubkey, executor: Arc<dyn Executor>) {
let entry = if let Some(mut entry) = self.executors.remove(pubkey) {
saturating_add_assign!(self.stats.hits, 1);
entry.executor = executor;
entry
} else {
saturating_add_assign!(self.stats.misses, 1);
fn put(&mut self, executors: &[(&Pubkey, Arc<dyn Executor>)]) {
let mut new_executors: Vec<_> = executors
.iter()
.filter_map(|(key, executor)| {
if let Some(mut entry) = self.executors.remove(key) {
saturating_add_assign!(self.stats.hits, 1);
entry.executor = executor.clone();
let _ = self.executors.insert(**key, entry);
None
} else {
saturating_add_assign!(self.stats.misses, 1);
Some((*key, executor))
}
})
.collect();

if !new_executors.is_empty() {
let mut counts = self
.executors
.iter()
Expand All @@ -460,10 +470,15 @@ impl CachedExecutors {
.collect::<Vec<_>>();
counts.sort_unstable_by_key(|(_, count)| *count);

let primer_count = Self::get_primer_count(counts.as_slice());
let primer_counts = Self::get_primer_counts(counts.as_slice(), new_executors.len());

if self.executors.len() >= self.max {
if let Some(least_key) = counts.first().map(|least| *least.0) {
let mut least_keys = counts
.iter()
.take(new_executors.len())
.map(|least| *least.0)
.collect::<Vec<_>>();
for least_key in least_keys.drain(..) {
let _ = self.executors.remove(&least_key);
self.stats
.evictions
Expand All @@ -473,13 +488,15 @@ impl CachedExecutors {
}
}

CachedExecutorsEntry {
prev_epoch_count: 0,
epoch_count: AtomicU64::new(primer_count),
executor,
for ((key, executor), primer_count) in new_executors.drain(..).zip(primer_counts) {
let entry = CachedExecutorsEntry {
prev_epoch_count: 0,
epoch_count: AtomicU64::new(primer_count),
executor: executor.clone(),
};
let _ = self.executors.insert(*key, entry);
}
};
let _ = self.executors.insert(*pubkey, entry);
}
}

fn remove(&mut self, pubkey: &Pubkey) {
Expand Down Expand Up @@ -512,10 +529,13 @@ impl CachedExecutors {
.unwrap_or(0)
}

fn get_primer_count(counts: &[(&Pubkey, u64)]) -> u64 {
fn get_primer_counts(counts: &[(&Pubkey, u64)], num_counts: usize) -> Vec<u64> {
let max_primer_count = Self::get_primer_count_upper_bound_inclusive(counts);
let mut rng = rand::thread_rng();
rng.gen_range(0, max_primer_count.saturating_add(1))

(0..num_counts)
.map(|_| rng.gen_range(0, max_primer_count.saturating_add(1)))
.collect::<Vec<_>>()
}
}

Expand Down Expand Up @@ -3425,9 +3445,7 @@ impl Bank {
if !dirty_executors.is_empty() {
let mut cache = self.cached_executors.write().unwrap();
let cache = Arc::make_mut(&mut cache);
for (key, executor) in dirty_executors.into_iter() {
cache.put(key, executor);
}
cache.put(&dirty_executors);
}
}

Expand Down Expand Up @@ -11924,17 +11942,17 @@ pub(crate) mod tests {
let executor: Arc<dyn Executor> = Arc::new(TestExecutor {});
let mut cache = CachedExecutors::new(3, 0);

cache.put(&key1, executor.clone());
cache.put(&key2, executor.clone());
cache.put(&key3, executor.clone());
cache.put(&[(&key1, executor.clone())]);
cache.put(&[(&key2, executor.clone())]);
cache.put(&[(&key3, executor.clone())]);
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key3).is_some());

assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_some());
cache.put(&key4, executor.clone());
cache.put(&[(&key4, executor.clone())]);
assert!(cache.get(&key4).is_some());
let num_retained = [&key1, &key2, &key3]
.iter()
Expand All @@ -11946,7 +11964,7 @@ pub(crate) mod tests {
assert!(cache.get(&key4).is_some());
assert!(cache.get(&key4).is_some());
assert!(cache.get(&key4).is_some());
cache.put(&key3, executor.clone());
cache.put(&[(&key3, executor.clone())]);
assert!(cache.get(&key3).is_some());
let num_retained = [&key1, &key2, &key4]
.iter()
Expand All @@ -11957,7 +11975,7 @@ pub(crate) mod tests {
}

#[test]
fn test_cached_executors_eviction() {
fn test_cached_executor_eviction() {
let key1 = solana_sdk::pubkey::new_rand();
let key2 = solana_sdk::pubkey::new_rand();
let key3 = solana_sdk::pubkey::new_rand();
Expand All @@ -11966,9 +11984,9 @@ pub(crate) mod tests {
let mut cache = CachedExecutors::new(3, 0);
assert!(cache.current_epoch == 0);

cache.put(&key1, executor.clone());
cache.put(&key2, executor.clone());
cache.put(&key3, executor.clone());
cache.put(&[(&key1, executor.clone())]);
cache.put(&[(&key2, executor.clone())]);
cache.put(&[(&key3, executor.clone())]);
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key1).is_some());
Expand All @@ -11979,7 +11997,7 @@ pub(crate) mod tests {
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key3).is_some());
Arc::make_mut(&mut cache).put(&key4, executor.clone());
Arc::make_mut(&mut cache).put(&[(&key4, executor.clone())]);

assert!(cache.get(&key4).is_some());
let num_retained = [&key1, &key2, &key3]
Expand All @@ -11989,8 +12007,8 @@ pub(crate) mod tests {
.count();
assert_eq!(num_retained, 2);

Arc::make_mut(&mut cache).put(&key1, executor.clone());
Arc::make_mut(&mut cache).put(&key3, executor.clone());
Arc::make_mut(&mut cache).put(&[(&key1, executor.clone())]);
Arc::make_mut(&mut cache).put(&[(&key3, executor.clone())]);
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key3).is_some());
let num_retained = [&key2, &key4]
Expand All @@ -12003,7 +12021,7 @@ pub(crate) mod tests {
cache = cache.clone_with_epoch(2);
assert!(cache.current_epoch == 2);

Arc::make_mut(&mut cache).put(&key3, executor.clone());
Arc::make_mut(&mut cache).put(&[(&key3, executor.clone())]);
assert!(cache.get(&key3).is_some());
}

Expand All @@ -12015,11 +12033,11 @@ pub(crate) mod tests {
let executor: Arc<dyn Executor> = Arc::new(TestExecutor {});
let mut cache = CachedExecutors::new(2, 0);

cache.put(&key1, executor.clone());
cache.put(&[(&key1, executor.clone())]);
for _ in 0..5 {
let _ = cache.get(&key1);
}
cache.put(&key2, executor.clone());
cache.put(&[(&key2, executor.clone())]);
// make key1's use-count for sure greater than key2's
let _ = cache.get(&key1);

Expand All @@ -12031,7 +12049,7 @@ pub(crate) mod tests {
entries.sort_by_key(|(_, v)| *v);
assert!(entries[0].1 < entries[1].1);

cache.put(&key3, executor.clone());
cache.put(&[(&key3, executor.clone())]);
assert!(cache.get(&entries[0].0).is_none());
assert!(cache.get(&entries[1].0).is_some());
}
Expand Down