diff --git a/program-runtime/src/loaded_programs.rs b/program-runtime/src/loaded_programs.rs index c210b8d0e9b314..214c7e842c4f94 100644 --- a/program-runtime/src/loaded_programs.rs +++ b/program-runtime/src/loaded_programs.rs @@ -79,6 +79,8 @@ pub enum LoadedProgramType { #[default] Closed, DelayVisibility, + /// Currently enqueued in the cooperative loading phase + Loading(Mutex<(bool, Vec>>)>), /// Successfully verified but not currently compiled, used to track usage statistics when a compiled program is evicted from memory. Unloaded(ProgramRuntimeEnvironment), LegacyV0(Executable>), @@ -97,6 +99,7 @@ impl Debug for LoadedProgramType { } LoadedProgramType::Closed => write!(f, "LoadedProgramType::Closed"), LoadedProgramType::DelayVisibility => write!(f, "LoadedProgramType::DelayVisibility"), + LoadedProgramType::Loading(_) => write!(f, "LoadedProgramType::Loading"), LoadedProgramType::Unloaded(_) => write!(f, "LoadedProgramType::Unloaded"), LoadedProgramType::LegacyV0(_) => write!(f, "LoadedProgramType::LegacyV0"), LoadedProgramType::LegacyV1(_) => write!(f, "LoadedProgramType::LegacyV1"), @@ -275,6 +278,18 @@ impl LoadedProgram { ) } + fn new_loading() -> Self { + Self { + program: LoadedProgramType::Loading(Mutex::new((false, vec![]))), + account_size: 0, + deployment_slot: 0, + effective_slot: 0, + maybe_expiration_slot: None, + tx_usage_counter: AtomicU64::new(0), + ix_usage_counter: AtomicU64::new(0), + } + } + /// Reloads a user program, *without* running the verifier. /// /// # Safety @@ -511,7 +526,7 @@ pub struct LoadedProgramsForTxBatch { pub struct ExtractedPrograms { pub loaded: LoadedProgramsForTxBatch, - pub missing: HashMap, + pub missing: HashMap, bool)>, } impl LoadedProgramsForTxBatch { @@ -791,7 +806,7 @@ impl LoadedPrograms { /// Extracts a subset of the programs relevant to a transaction batch /// and returns which program accounts the accounts DB needs to load. pub fn extract( - &self, + &mut self, working_slot: &S, keys: impl Iterator, ) -> Arc> { @@ -808,9 +823,13 @@ impl LoadedPrograms { let mut extracting = extracted.lock().unwrap(); extracting.loaded.entries = keys .filter_map(|(key, (match_criteria, count))| { + let mut missing_entry = None; let mut reloading = false; if let Some(second_level) = self.entries.get(&key) { for entry in second_level.iter().rev() { + if let LoadedProgramType::Loading(_) = &entry.program { + continue; + } let is_ancestor = if let Some(fork_graph) = &self.fork_graph { fork_graph .read() @@ -825,7 +844,6 @@ impl LoadedPrograms { } else { working_slot.is_ancestor(entry.deployment_slot) }; - if entry.deployment_slot <= self.latest_root_slot || entry.deployment_slot == current_slot || is_ancestor @@ -842,15 +860,13 @@ impl LoadedPrograms { break; } - let mut usage_count = - entry.tx_usage_counter.load(Ordering::Relaxed); - saturating_add_assign!(usage_count, count); - entry.tx_usage_counter.store(usage_count, Ordering::Relaxed); + entry.tx_usage_counter.fetch_add(count, Ordering::Relaxed); return Some((key, entry.clone())); } else if entry.is_implicit_delay_visibility_tombstone(current_slot) { // Found a program entry on the current fork, but it's not effective // yet. It indicates that the program has delayed visibility. Return // the tombstone to reflect that. + entry.tx_usage_counter.fetch_add(count, Ordering::Relaxed); return Some(( key, Arc::new(LoadedProgram::new_tombstone( @@ -861,8 +877,15 @@ impl LoadedPrograms { } } } + if let Some(entry) = second_level.first() { + if let LoadedProgramType::Loading(_) = &entry.program { + missing_entry = Some(entry.clone()); + } + } } - extracting.missing.insert(key, (count, reloading)); + let entry = missing_entry.unwrap_or_else(|| Arc::new(LoadedProgram::new_loading())); + entry.tx_usage_counter.fetch_add(count, Ordering::Relaxed); + extracting.missing.insert(key, (entry, reloading)); None }) .collect::>>(); @@ -872,6 +895,23 @@ impl LoadedPrograms { self.stats .hits .fetch_add(extracting.loaded.entries.len() as u64, Ordering::Relaxed); + for (key, (entry, _reloading)) in extracting.missing.iter() { + if let LoadedProgramType::Loading(mutex) = &entry.program { + // The Mutex around `waiting_list` is strictly speaking unnecessary + // because the entire `LoadedPrograms` cache is already locked here. + let waiting_list = &mut mutex.lock().unwrap().1; + if waiting_list.is_empty() { + self.assign_program(*key, entry.clone()); + } + let index = match waiting_list.binary_search_by(|tx_batch| { + tx_batch.lock().unwrap().loaded.slot.cmp(¤t_slot) + }) { + Ok(index) => index, + Err(index) => index, + }; + waiting_list.insert(index, extracted.clone()); + } + } drop(extracting); extracted } @@ -1648,7 +1688,7 @@ mod tests { extracted .missing .get(key) - .filter(|(_count, reloading)| *reloading == reload) + .filter(|(_entry, reloading)| *reloading == reload) .is_some() } @@ -2257,7 +2297,7 @@ mod tests { .get(&program1) .expect("Didn't find program1") .len(), - 3 + 4 ); // New root 5 should not evict the expired entry for program1 @@ -2268,7 +2308,7 @@ mod tests { .get(&program1) .expect("Didn't find program1") .len(), - 1 + 2 ); // New root 15 should evict the expired entry for program1 diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 58d08825b9998c..6f11c38700c463 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -5073,7 +5073,7 @@ impl Bank { missing, } = { // Lock the global cache to figure out which programs need to be loaded - let loaded_programs_cache = self.loaded_programs_cache.read().unwrap(); + let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap(); Mutex::into_inner( Arc::into_inner( loaded_programs_cache.extract(self, programs_and_slots.into_iter()), @@ -5086,9 +5086,12 @@ impl Bank { // Load missing programs while global cache is unlocked let missing_programs: Vec<(Pubkey, Arc)> = missing .iter() - .map(|(key, (count, reloading))| { + .map(|(key, (entry, reloading))| { let program = self.load_program(key, *reloading, None); - program.tx_usage_counter.store(*count, Ordering::Relaxed); + program.tx_usage_counter.store( + entry.tx_usage_counter.load(Ordering::Relaxed), + Ordering::Relaxed, + ); (*key, program) }) .collect();