Skip to content

Commit

Permalink
program cache: reduce contention
Browse files Browse the repository at this point in the history
Before this change we used to take the write lock to extract(). This
means that even in the ideal case (all programs are already cached),
the cache was contended by all batches and all operations were
serialized.

With this change we now take the write lock only when we store a new
entry in the cache, and take the read lock to extract(). This means
that in the common case where most/all programs are cached, there is no
contention and all batches progress in parallel.

This improves node replay perf by 20-25% on current mnb traffic.
  • Loading branch information
alessandrod committed May 5, 2024
1 parent b0cfffb commit b688cac
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 60 deletions.
46 changes: 22 additions & 24 deletions program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
saturating_add_assign,
},
std::{
collections::HashMap,
collections::{hash_map::Entry, HashMap},
fmt::{Debug, Formatter},
sync::{
atomic::{AtomicU64, Ordering},
Expand Down Expand Up @@ -583,11 +583,6 @@ impl LoadingTaskWaiter {
struct SecondLevel {
/// List of all versions (across all forks) of a program sorted by the slot in which they were modified
slot_versions: Vec<Arc<ProgramCacheEntry>>,
/// `Some` if there is currently a cooperative loading task for this program address
///
/// It is possible that multiple TX batches from different slots need different versions of a program.
/// However, that can only be figured out once a program is loaded and its deployment slot is known.
cooperative_loading_lock: Option<(Slot, std::thread::ThreadId)>,
}

/// This structure is the global cache of loaded, verified and compiled programs.
Expand All @@ -610,6 +605,15 @@ pub struct ProgramCache<FG: ForkGraph> {
///
/// The first level is for the address at which programs are deployed and the second level for the slot (and thus also fork).
entries: HashMap<Pubkey, SecondLevel>,
/// The entries that are getting loaded and have not yet finished loading.
///
/// The key is the program address, the value is a tuple of the slot in which the program is
/// being loaded and the thread ID doing the load.
///
/// It is possible that multiple TX batches from different slots need different versions of a
/// program. The deployment slot of a program is only known after load tho,
/// so all loads for a given program key are serialized.
loading_entries: Mutex<HashMap<Pubkey, (Slot, std::thread::ThreadId)>>,
/// The slot of the last rerooting
pub latest_root_slot: Slot,
/// The epoch of the last rerooting
Expand Down Expand Up @@ -776,6 +780,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {
pub fn new(root_slot: Slot, root_epoch: Epoch) -> Self {
Self {
entries: HashMap::new(),
loading_entries: Mutex::new(HashMap::new()),
latest_root_slot: root_slot,
latest_root_epoch: root_epoch,
environments: ProgramRuntimeEnvironments::default(),
Expand Down Expand Up @@ -974,7 +979,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {
/// Extracts a subset of the programs relevant to a transaction batch
/// and returns which program accounts the accounts DB needs to load.
pub fn extract(
&mut self,
&self,
search_for: &mut Vec<(Pubkey, (ProgramCacheMatchCriteria, u64))>,
loaded_programs_for_tx_batch: &mut ProgramCacheForTxBatch,
is_first_round: bool,
Expand All @@ -983,7 +988,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {
let locked_fork_graph = self.fork_graph.as_ref().unwrap().read().unwrap();
let mut cooperative_loading_task = None;
search_for.retain(|(key, (match_criteria, usage_count))| {
if let Some(second_level) = self.entries.get_mut(key) {
if let Some(second_level) = self.entries.get(key) {
for entry in second_level.slot_versions.iter().rev() {
if entry.deployment_slot <= self.latest_root_slot
|| matches!(
Expand Down Expand Up @@ -1033,15 +1038,14 @@ impl<FG: ForkGraph> ProgramCache<FG> {
}
}
if cooperative_loading_task.is_none() {
// We have not selected a task so far
let second_level = self.entries.entry(*key).or_default();
if second_level.cooperative_loading_lock.is_none() {
// Select this missing entry which is not selected by any other TX batch yet
cooperative_loading_task = Some((*key, *usage_count));
second_level.cooperative_loading_lock = Some((
let mut loading_entries = self.loading_entries.lock().unwrap();
let entry = loading_entries.entry(*key);
if let Entry::Vacant(entry) = entry {
entry.insert((
loaded_programs_for_tx_batch.slot,
std::thread::current().id(),
));
cooperative_loading_task = Some((*key, *usage_count));
}
}
true
Expand All @@ -1066,12 +1070,8 @@ impl<FG: ForkGraph> ProgramCache<FG> {
key: Pubkey,
loaded_program: Arc<ProgramCacheEntry>,
) -> bool {
let second_level = self.entries.entry(key).or_default();
debug_assert_eq!(
second_level.cooperative_loading_lock,
Some((slot, std::thread::current().id()))
);
second_level.cooperative_loading_lock = None;
let loading_thread = self.loading_entries.lock().unwrap().remove(&key);
debug_assert_eq!(loading_thread, Some((slot, std::thread::current().id())));
// Check that it will be visible to our own fork once inserted
if loaded_program.deployment_slot > self.latest_root_slot
&& !matches!(
Expand Down Expand Up @@ -1237,10 +1237,8 @@ impl<FG: ForkGraph> ProgramCache<FG> {

fn remove_programs_with_no_entries(&mut self) {
let num_programs_before_removal = self.entries.len();
self.entries.retain(|_, second_level| {
!second_level.slot_versions.is_empty()
|| second_level.cooperative_loading_lock.is_some()
});
self.entries
.retain(|_key, second_level| !second_level.slot_versions.is_empty());
if self.entries.len() < num_programs_before_removal {
self.stats.empty_entries.fetch_add(
num_programs_before_removal.saturating_sub(self.entries.len()) as u64,
Expand Down
73 changes: 37 additions & 36 deletions svm/src/transaction_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,10 @@ impl<FG: ForkGraph> TransactionBatchProcessor<FG> {
.collect();

let mut loaded_programs_for_txs = None;
let mut program_to_store = None;
loop {
let (program_to_load, task_cookie, task_waiter) = {
let (program_to_store, task_cookie, task_waiter) = {
// Lock the global cache.
let mut program_cache = self.program_cache.write().unwrap();
let program_cache = self.program_cache.read().unwrap();
// Initialize our local cache.
let is_first_round = loaded_programs_for_txs.is_none();
if is_first_round {
Expand All @@ -401,49 +400,51 @@ impl<FG: ForkGraph> TransactionBatchProcessor<FG> {
&program_cache,
));
}
// Submit our last completed loading task.
if let Some((key, program)) = program_to_store.take() {
loaded_programs_for_txs.as_mut().unwrap().loaded_missing = true;
if program_cache.finish_cooperative_loading_task(self.slot, key, program)
&& limit_to_load_programs
{
// This branch is taken when there is an error in assigning a program to a
// cache slot. It is not possible to mock this error for SVM unit
// tests purposes.
let mut ret = ProgramCacheForTxBatch::new_from_cache(
self.slot,
self.epoch,
&program_cache,
);
ret.hit_max_limit = true;
return ret;
}
}
// Figure out which program needs to be loaded next.
let program_to_load = program_cache.extract(
&mut missing_programs,
loaded_programs_for_txs.as_mut().unwrap(),
is_first_round,
);

let program_to_store = program_to_load.map(|(key, count)| {
// Load, verify and compile one program.
let program = load_program_with_pubkey(
callback,
&program_cache,
&key,
self.slot,
self.epoch,
&self.epoch_schedule,
false,
)
.expect("called load_program_with_pubkey() with nonexistent account");
program.tx_usage_counter.store(count, Ordering::Relaxed);
(key, program)
});

let task_waiter = Arc::clone(&program_cache.loading_task_waiter);
(program_to_load, task_waiter.cookie(), task_waiter)
(program_to_store, task_waiter.cookie(), task_waiter)
// Unlock the global cache again.
};

if let Some((key, count)) = program_to_load {
// Load, verify and compile one program.
let program = load_program_with_pubkey(
callback,
&self.program_cache.read().unwrap(),
&key,
self.slot,
self.epoch,
&self.epoch_schedule,
false,
)
.expect("called load_program_with_pubkey() with nonexistent account");
program.tx_usage_counter.store(count, Ordering::Relaxed);
program_to_store = Some((key, program));
if let Some((key, program)) = program_to_store {
let mut program_cache = self.program_cache.write().unwrap();
// Submit our last completed loading task.
if program_cache.finish_cooperative_loading_task(self.slot, key, program)
&& limit_to_load_programs
{
// This branch is taken when there is an error in assigning a program to a
// cache slot. It is not possible to mock this error for SVM unit
// tests purposes.
let mut ret = ProgramCacheForTxBatch::new_from_cache(
self.slot,
self.epoch,
&program_cache,
);
ret.hit_max_limit = true;
return ret;
}
} else if missing_programs.is_empty() {
break;
} else {
Expand Down

0 comments on commit b688cac

Please sign in to comment.