Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

program cache: reduce contention #1192

Merged
merged 2 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 35 additions & 51 deletions program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
saturating_add_assign,
},
std::{
collections::HashMap,
collections::{hash_map::Entry, HashMap},
fmt::{Debug, Formatter},
sync::{
atomic::{AtomicU64, Ordering},
Expand Down Expand Up @@ -227,7 +227,7 @@ pub struct ProgramCacheStats {
pub prunes_orphan: AtomicU64,
/// a program got pruned because it was not recompiled for the next epoch
pub prunes_environment: AtomicU64,
/// the [SecondLevel] was empty because all slot versions got pruned
/// a program had no entries because all slot versions got pruned
pub empty_entries: AtomicU64,
}

Expand Down Expand Up @@ -578,18 +578,6 @@ impl LoadingTaskWaiter {
}
}

/// Contains all the program versions at a specific address.
#[derive(Debug, Default)]
struct SecondLevel {
/// List of all versions (across all forks) of a program sorted by the slot in which they were modified
slot_versions: Vec<Arc<ProgramCacheEntry>>,
/// `Some` if there is currently a cooperative loading task for this program address
///
/// It is possible that multiple TX batches from different slots need different versions of a program.
/// However, that can only be figured out once a program is loaded and its deployment slot is known.
cooperative_loading_lock: Option<(Slot, std::thread::ThreadId)>,
}

/// This structure is the global cache of loaded, verified and compiled programs.
///
/// It ...
Expand All @@ -608,8 +596,18 @@ struct SecondLevel {
pub struct ProgramCache<FG: ForkGraph> {
/// A two level index:
///
/// The first level is for the address at which programs are deployed and the second level for the slot (and thus also fork).
entries: HashMap<Pubkey, SecondLevel>,
/// - the first level is for the address at which programs are deployed
/// - the second level for the slot (and thus also fork), sorted by slot number.
entries: HashMap<Pubkey, Vec<Arc<ProgramCacheEntry>>>,
/// The entries that are getting loaded and have not yet finished loading.
///
/// The key is the program address, the value is a tuple of the slot in which the program is
/// being loaded and the thread ID doing the load.
///
/// It is possible that multiple TX batches from different slots need different versions of a
/// program. The deployment slot of a program is only known after load tho,
/// so all loads for a given program key are serialized.
loading_entries: Mutex<HashMap<Pubkey, (Slot, std::thread::ThreadId)>>,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the replacement of the index structure I am working on I also experimented with pulling cooperative_loading_lock out of the SecondLevel. We might be able to take this even further because when we use the account last-modified-slot we could also parallelize the loading of missing programs. Basically the "The deployment slot of a program is only known after load tho" part could be improved then.

/// The slot of the last rerooting
pub latest_root_slot: Slot,
/// The epoch of the last rerooting
Expand Down Expand Up @@ -776,6 +774,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {
pub fn new(root_slot: Slot, root_epoch: Epoch) -> Self {
Self {
entries: HashMap::new(),
loading_entries: Mutex::new(HashMap::new()),
latest_root_slot: root_slot,
latest_root_epoch: root_epoch,
environments: ProgramRuntimeEnvironments::default(),
Expand Down Expand Up @@ -819,7 +818,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {
&entry.program,
ProgramCacheEntryType::DelayVisibility
));
let slot_versions = &mut self.entries.entry(key).or_default().slot_versions;
let slot_versions = &mut self.entries.entry(key).or_default();
match slot_versions.binary_search_by(|at| {
at.effective_slot
.cmp(&entry.effective_slot)
Expand Down Expand Up @@ -860,9 +859,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {

pub fn prune_by_deployment_slot(&mut self, slot: Slot) {
for second_level in self.entries.values_mut() {
second_level
.slot_versions
.retain(|entry| entry.deployment_slot != slot);
second_level.retain(|entry| entry.deployment_slot != slot);
}
self.remove_programs_with_no_entries();
}
Expand Down Expand Up @@ -890,8 +887,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {
// Remove entries un/re/deployed on orphan forks
let mut first_ancestor_found = false;
let mut first_ancestor_env = None;
second_level.slot_versions = second_level
.slot_versions
*second_level = second_level
.iter()
.rev()
.filter(|entry| {
Expand Down Expand Up @@ -940,7 +936,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {
})
.cloned()
.collect();
second_level.slot_versions.reverse();
second_level.reverse();
}
self.remove_programs_with_no_entries();
debug_assert!(self.latest_root_slot <= new_root_slot);
Expand Down Expand Up @@ -974,7 +970,7 @@ impl<FG: ForkGraph> ProgramCache<FG> {
/// Extracts a subset of the programs relevant to a transaction batch
/// and returns which program accounts the accounts DB needs to load.
pub fn extract(
&mut self,
&self,
alessandrod marked this conversation as resolved.
Show resolved Hide resolved
search_for: &mut Vec<(Pubkey, (ProgramCacheMatchCriteria, u64))>,
loaded_programs_for_tx_batch: &mut ProgramCacheForTxBatch,
is_first_round: bool,
Expand All @@ -983,8 +979,8 @@ impl<FG: ForkGraph> ProgramCache<FG> {
let locked_fork_graph = self.fork_graph.as_ref().unwrap().read().unwrap();
let mut cooperative_loading_task = None;
search_for.retain(|(key, (match_criteria, usage_count))| {
if let Some(second_level) = self.entries.get_mut(key) {
for entry in second_level.slot_versions.iter().rev() {
if let Some(second_level) = self.entries.get(key) {
for entry in second_level.iter().rev() {
if entry.deployment_slot <= self.latest_root_slot
|| matches!(
locked_fork_graph.relationship(
Expand Down Expand Up @@ -1033,15 +1029,14 @@ impl<FG: ForkGraph> ProgramCache<FG> {
}
}
if cooperative_loading_task.is_none() {
// We have not selected a task so far
let second_level = self.entries.entry(*key).or_default();
if second_level.cooperative_loading_lock.is_none() {
// Select this missing entry which is not selected by any other TX batch yet
cooperative_loading_task = Some((*key, *usage_count));
second_level.cooperative_loading_lock = Some((
let mut loading_entries = self.loading_entries.lock().unwrap();
let entry = loading_entries.entry(*key);
if let Entry::Vacant(entry) = entry {
entry.insert((
loaded_programs_for_tx_batch.slot,
std::thread::current().id(),
));
cooperative_loading_task = Some((*key, *usage_count));
}
}
true
Expand All @@ -1066,12 +1061,8 @@ impl<FG: ForkGraph> ProgramCache<FG> {
key: Pubkey,
loaded_program: Arc<ProgramCacheEntry>,
) -> bool {
let second_level = self.entries.entry(key).or_default();
debug_assert_eq!(
second_level.cooperative_loading_lock,
Some((slot, std::thread::current().id()))
);
second_level.cooperative_loading_lock = None;
let loading_thread = self.loading_entries.lock().unwrap().remove(&key);
debug_assert_eq!(loading_thread, Some((slot, std::thread::current().id())));
// Check that it will be visible to our own fork once inserted
if loaded_program.deployment_slot > self.latest_root_slot
&& !matches!(
Expand Down Expand Up @@ -1107,7 +1098,6 @@ impl<FG: ForkGraph> ProgramCache<FG> {
.iter()
.flat_map(|(id, second_level)| {
second_level
.slot_versions
.iter()
.filter_map(move |program| match program.program {
ProgramCacheEntryType::Loaded(_) => {
Expand All @@ -1132,19 +1122,16 @@ impl<FG: ForkGraph> ProgramCache<FG> {
self.entries
.iter()
.flat_map(|(id, second_level)| {
second_level
.slot_versions
.iter()
.map(|program| (*id, program.clone()))
second_level.iter().map(|program| (*id, program.clone()))
})
.collect()
}

/// Returns the `slot_versions` of the second level for the given program id.
/// Returns the slot versions for the given program id.
pub fn get_slot_versions_for_tests(&self, key: &Pubkey) -> &[Arc<ProgramCacheEntry>] {
self.entries
.get(key)
.map(|second_level| second_level.slot_versions.as_ref())
.map(|second_level| second_level.as_ref())
.unwrap_or(&[])
}

Expand Down Expand Up @@ -1205,7 +1192,6 @@ impl<FG: ForkGraph> ProgramCache<FG> {
fn unload_program_entry(&mut self, program: &Pubkey, remove_entry: &Arc<ProgramCacheEntry>) {
let second_level = self.entries.get_mut(program).expect("Cache lookup failed");
let candidate = second_level
.slot_versions
.iter_mut()
.find(|entry| entry == &remove_entry)
.expect("Program entry not found");
Expand Down Expand Up @@ -1237,10 +1223,8 @@ impl<FG: ForkGraph> ProgramCache<FG> {

fn remove_programs_with_no_entries(&mut self) {
let num_programs_before_removal = self.entries.len();
self.entries.retain(|_, second_level| {
!second_level.slot_versions.is_empty()
|| second_level.cooperative_loading_lock.is_some()
});
self.entries
.retain(|_key, second_level| !second_level.is_empty());
if self.entries.len() < num_programs_before_removal {
self.stats.empty_entries.fetch_add(
num_programs_before_removal.saturating_sub(self.entries.len()) as u64,
Expand Down Expand Up @@ -2072,7 +2056,7 @@ mod tests {
keys.iter()
.filter_map(|key| {
let visible_entry = cache.entries.get(key).and_then(|second_level| {
second_level.slot_versions.iter().rev().find(|entry| {
second_level.iter().rev().find(|entry| {
matches!(
locked_fork_graph.relationship(entry.deployment_slot, loading_slot),
BlockRelation::Equal | BlockRelation::Ancestor,
Expand Down
73 changes: 37 additions & 36 deletions svm/src/transaction_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,10 @@ impl<FG: ForkGraph> TransactionBatchProcessor<FG> {
.collect();

let mut loaded_programs_for_txs = None;
let mut program_to_store = None;
loop {
let (program_to_load, task_cookie, task_waiter) = {
let (program_to_store, task_cookie, task_waiter) = {
// Lock the global cache.
let mut program_cache = self.program_cache.write().unwrap();
let program_cache = self.program_cache.read().unwrap();
// Initialize our local cache.
let is_first_round = loaded_programs_for_txs.is_none();
if is_first_round {
Expand All @@ -401,49 +400,51 @@ impl<FG: ForkGraph> TransactionBatchProcessor<FG> {
&program_cache,
));
}
// Submit our last completed loading task.
if let Some((key, program)) = program_to_store.take() {
loaded_programs_for_txs.as_mut().unwrap().loaded_missing = true;
Copy link

@Lichtso Lichtso May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alessandrod I think you accidentally deleted this line here while moving the code. It was added in #1037 which was open around the same time as this PR, so most likely a merge conflict that was not resolved properly.

When I investigated the validator node of @buffalojoec (which went out of disk space as it was not configured to rotate log files) I discovered that the program cache was growing far beyond the capacity as it was not evicting anymore.

Can you open a PR to add the line back in?

I will add a metric for the number of loaded entries (complementary to the evictions metric we already have) recorded in ProgramCache::evict_using_2s_random_selection() so that this doesn't happen again.

if program_cache.finish_cooperative_loading_task(self.slot, key, program)
&& limit_to_load_programs
{
// This branch is taken when there is an error in assigning a program to a
// cache slot. It is not possible to mock this error for SVM unit
// tests purposes.
let mut ret = ProgramCacheForTxBatch::new_from_cache(
self.slot,
self.epoch,
&program_cache,
);
ret.hit_max_limit = true;
return ret;
}
}
// Figure out which program needs to be loaded next.
let program_to_load = program_cache.extract(
&mut missing_programs,
loaded_programs_for_txs.as_mut().unwrap(),
is_first_round,
);

let program_to_store = program_to_load.map(|(key, count)| {
// Load, verify and compile one program.
let program = load_program_with_pubkey(
callback,
&program_cache,
&key,
self.slot,
self.epoch,
&self.epoch_schedule,
false,
)
.expect("called load_program_with_pubkey() with nonexistent account");
program.tx_usage_counter.store(count, Ordering::Relaxed);
(key, program)
});

let task_waiter = Arc::clone(&program_cache.loading_task_waiter);
(program_to_load, task_waiter.cookie(), task_waiter)
(program_to_store, task_waiter.cookie(), task_waiter)
// Unlock the global cache again.
};

if let Some((key, count)) = program_to_load {
// Load, verify and compile one program.
let program = load_program_with_pubkey(
callback,
&self.program_cache.read().unwrap(),
&key,
self.slot,
self.epoch,
&self.epoch_schedule,
false,
)
.expect("called load_program_with_pubkey() with nonexistent account");
program.tx_usage_counter.store(count, Ordering::Relaxed);
program_to_store = Some((key, program));
if let Some((key, program)) = program_to_store {
let mut program_cache = self.program_cache.write().unwrap();
// Submit our last completed loading task.
if program_cache.finish_cooperative_loading_task(self.slot, key, program)
&& limit_to_load_programs
{
// This branch is taken when there is an error in assigning a program to a
// cache slot. It is not possible to mock this error for SVM unit
// tests purposes.
let mut ret = ProgramCacheForTxBatch::new_from_cache(
self.slot,
self.epoch,
&program_cache,
);
ret.hit_max_limit = true;
return ret;
}
} else if missing_programs.is_empty() {
break;
} else {
Expand Down
Loading