diff --git a/program-runtime/src/loaded_programs.rs b/program-runtime/src/loaded_programs.rs index 2ad14a712e9cfb..37413474a4cb65 100644 --- a/program-runtime/src/loaded_programs.rs +++ b/program-runtime/src/loaded_programs.rs @@ -578,6 +578,27 @@ impl LoadingTaskWaiter { } } +#[derive(Debug)] +enum IndexImplementation { + /// Fork-graph aware index implementation + V1 { + /// A two level index: + /// + /// - the first level is for the address at which programs are deployed + /// - the second level for the slot (and thus also fork), sorted by slot number. + entries: HashMap>>, + /// The entries that are getting loaded and have not yet finished loading. + /// + /// The key is the program address, the value is a tuple of the slot in which the program is + /// being loaded and the thread ID doing the load. + /// + /// It is possible that multiple TX batches from different slots need different versions of a + /// program. The deployment slot of a program is only known after load tho, + /// so all loads for a given program key are serialized. + loading_entries: Mutex>, + }, +} + /// This structure is the global cache of loaded, verified and compiled programs. /// /// It ... @@ -594,20 +615,8 @@ impl LoadingTaskWaiter { /// - enforces that all programs used in a batch are eagerly loaded ahead of execution. /// - is not persisted to disk or a snapshot, so it needs to cold start and warm up first. pub struct ProgramCache { - /// A two level index: - /// - /// - the first level is for the address at which programs are deployed - /// - the second level for the slot (and thus also fork), sorted by slot number. - entries: HashMap>>, - /// The entries that are getting loaded and have not yet finished loading. - /// - /// The key is the program address, the value is a tuple of the slot in which the program is - /// being loaded and the thread ID doing the load. - /// - /// It is possible that multiple TX batches from different slots need different versions of a - /// program. The deployment slot of a program is only known after load tho, - /// so all loads for a given program key are serialized. - loading_entries: Mutex>, + /// Index of the cached entries and cooperative loading tasks + index: IndexImplementation, /// The slot of the last rerooting pub latest_root_slot: Slot, /// The epoch of the last rerooting @@ -636,7 +645,7 @@ impl Debug for ProgramCache { .field("root slot", &self.latest_root_slot) .field("root epoch", &self.latest_root_epoch) .field("stats", &self.stats) - .field("cache", &self.entries) + .field("index", &self.index) .finish() } } @@ -773,8 +782,10 @@ pub enum ProgramCacheMatchCriteria { impl ProgramCache { pub fn new(root_slot: Slot, root_epoch: Epoch) -> Self { Self { - entries: HashMap::new(), - loading_entries: Mutex::new(HashMap::new()), + index: IndexImplementation::V1 { + entries: HashMap::new(), + loading_entries: Mutex::new(HashMap::new()), + }, latest_root_slot: root_slot, latest_root_epoch: root_epoch, environments: ProgramRuntimeEnvironments::default(), @@ -832,58 +843,75 @@ impl ProgramCache { }) .unwrap_or(true) } - let slot_versions = &mut self.entries.entry(key).or_default(); - match slot_versions.binary_search_by(|at| { - at.effective_slot - .cmp(&entry.effective_slot) - .then(at.deployment_slot.cmp(&entry.deployment_slot)) - .then( - // This `.then()` has no effect during normal operation. - // Only during the recompilation phase this does allow entries - // which only differ in their environment to be interleaved in `slot_versions`. - is_current_env(&self.environments, at.program.get_environment()).cmp( - &is_current_env(&self.environments, entry.program.get_environment()), - ), - ) - }) { - Ok(index) => { - let existing = slot_versions.get_mut(index).unwrap(); - match (&existing.program, &entry.program) { - (ProgramCacheEntryType::Builtin(_), ProgramCacheEntryType::Builtin(_)) - | (ProgramCacheEntryType::Unloaded(_), ProgramCacheEntryType::Loaded(_)) => {} - _ => { - // Something is wrong, I can feel it ... - error!("ProgramCache::assign_program() failed key={:?} existing={:?} entry={:?}", key, slot_versions, entry); - debug_assert!(false, "Unexpected replacement of an entry"); - self.stats.replacements.fetch_add(1, Ordering::Relaxed); - return true; + match &mut self.index { + IndexImplementation::V1 { entries, .. } => { + let slot_versions = &mut entries.entry(key).or_default(); + match slot_versions.binary_search_by(|at| { + at.effective_slot + .cmp(&entry.effective_slot) + .then(at.deployment_slot.cmp(&entry.deployment_slot)) + .then( + // This `.then()` has no effect during normal operation. + // Only during the recompilation phase this does allow entries + // which only differ in their environment to be interleaved in `slot_versions`. + is_current_env(&self.environments, at.program.get_environment()).cmp( + &is_current_env( + &self.environments, + entry.program.get_environment(), + ), + ), + ) + }) { + Ok(index) => { + let existing = slot_versions.get_mut(index).unwrap(); + match (&existing.program, &entry.program) { + ( + ProgramCacheEntryType::Builtin(_), + ProgramCacheEntryType::Builtin(_), + ) + | ( + ProgramCacheEntryType::Unloaded(_), + ProgramCacheEntryType::Loaded(_), + ) => {} + _ => { + // Something is wrong, I can feel it ... + error!("ProgramCache::assign_program() failed key={:?} existing={:?} entry={:?}", key, slot_versions, entry); + debug_assert!(false, "Unexpected replacement of an entry"); + self.stats.replacements.fetch_add(1, Ordering::Relaxed); + return true; + } + } + // Copy over the usage counter to the new entry + entry.tx_usage_counter.fetch_add( + existing.tx_usage_counter.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + entry.ix_usage_counter.fetch_add( + existing.ix_usage_counter.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + *existing = Arc::clone(&entry); + self.stats.reloads.fetch_add(1, Ordering::Relaxed); + } + Err(index) => { + self.stats.insertions.fetch_add(1, Ordering::Relaxed); + slot_versions.insert(index, Arc::clone(&entry)); } } - // Copy over the usage counter to the new entry - entry.tx_usage_counter.fetch_add( - existing.tx_usage_counter.load(Ordering::Relaxed), - Ordering::Relaxed, - ); - entry.ix_usage_counter.fetch_add( - existing.ix_usage_counter.load(Ordering::Relaxed), - Ordering::Relaxed, - ); - *existing = Arc::clone(&entry); - self.stats.reloads.fetch_add(1, Ordering::Relaxed); - } - Err(index) => { - self.stats.insertions.fetch_add(1, Ordering::Relaxed); - slot_versions.insert(index, Arc::clone(&entry)); } } false } pub fn prune_by_deployment_slot(&mut self, slot: Slot) { - for second_level in self.entries.values_mut() { - second_level.retain(|entry| entry.deployment_slot != slot); + match &mut self.index { + IndexImplementation::V1 { entries, .. } => { + for second_level in entries.values_mut() { + second_level.retain(|entry| entry.deployment_slot != slot); + } + self.remove_programs_with_no_entries(); + } } - self.remove_programs_with_no_entries(); } /// Before rerooting the blockstore this removes all superfluous entries @@ -905,60 +933,65 @@ impl ProgramCache { self.programs_to_recompile.clear(); } } - for second_level in self.entries.values_mut() { - // Remove entries un/re/deployed on orphan forks - let mut first_ancestor_found = false; - let mut first_ancestor_env = None; - *second_level = second_level - .iter() - .rev() - .filter(|entry| { - let relation = fork_graph.relationship(entry.deployment_slot, new_root_slot); - if entry.deployment_slot >= new_root_slot { - matches!(relation, BlockRelation::Equal | BlockRelation::Descendant) - } else if matches!(relation, BlockRelation::Ancestor) - || entry.deployment_slot <= self.latest_root_slot - { - if !first_ancestor_found { - first_ancestor_found = true; - first_ancestor_env = entry.program.get_environment(); - return true; - } - // Do not prune the entry if the runtime environment of the entry is different - // than the entry that was previously found (stored in first_ancestor_env). - // Different environment indicates that this entry might belong to an older - // epoch that had a different environment (e.g. different feature set). - // Once the root moves to the new/current epoch, the entry will get pruned. - // But, until then the entry might still be getting used by an older slot. - if let Some(entry_env) = entry.program.get_environment() { - if let Some(env) = first_ancestor_env { - if !Arc::ptr_eq(entry_env, env) { + match &mut self.index { + IndexImplementation::V1 { entries, .. } => { + for second_level in entries.values_mut() { + // Remove entries un/re/deployed on orphan forks + let mut first_ancestor_found = false; + let mut first_ancestor_env = None; + *second_level = second_level + .iter() + .rev() + .filter(|entry| { + let relation = + fork_graph.relationship(entry.deployment_slot, new_root_slot); + if entry.deployment_slot >= new_root_slot { + matches!(relation, BlockRelation::Equal | BlockRelation::Descendant) + } else if matches!(relation, BlockRelation::Ancestor) + || entry.deployment_slot <= self.latest_root_slot + { + if !first_ancestor_found { + first_ancestor_found = true; + first_ancestor_env = entry.program.get_environment(); return true; } + // Do not prune the entry if the runtime environment of the entry is different + // than the entry that was previously found (stored in first_ancestor_env). + // Different environment indicates that this entry might belong to an older + // epoch that had a different environment (e.g. different feature set). + // Once the root moves to the new/current epoch, the entry will get pruned. + // But, until then the entry might still be getting used by an older slot. + if let Some(entry_env) = entry.program.get_environment() { + if let Some(env) = first_ancestor_env { + if !Arc::ptr_eq(entry_env, env) { + return true; + } + } + } + self.stats.prunes_orphan.fetch_add(1, Ordering::Relaxed); + false + } else { + self.stats.prunes_orphan.fetch_add(1, Ordering::Relaxed); + false } - } - self.stats.prunes_orphan.fetch_add(1, Ordering::Relaxed); - false - } else { - self.stats.prunes_orphan.fetch_add(1, Ordering::Relaxed); - false - } - }) - .filter(|entry| { - // Remove outdated environment of previous feature set - if recompilation_phase_ends - && !Self::matches_environment(entry, &self.environments) - { - self.stats - .prunes_environment - .fetch_add(1, Ordering::Relaxed); - return false; - } - true - }) - .cloned() - .collect(); - second_level.reverse(); + }) + .filter(|entry| { + // Remove outdated environment of previous feature set + if recompilation_phase_ends + && !Self::matches_environment(entry, &self.environments) + { + self.stats + .prunes_environment + .fetch_add(1, Ordering::Relaxed); + return false; + } + true + }) + .cloned() + .collect(); + second_level.reverse(); + } + } } self.remove_programs_with_no_entries(); debug_assert!(self.latest_root_slot <= new_root_slot); @@ -1000,69 +1033,79 @@ impl ProgramCache { debug_assert!(self.fork_graph.is_some()); let locked_fork_graph = self.fork_graph.as_ref().unwrap().read().unwrap(); let mut cooperative_loading_task = None; - search_for.retain(|(key, (match_criteria, usage_count))| { - if let Some(second_level) = self.entries.get(key) { - for entry in second_level.iter().rev() { - if entry.deployment_slot <= self.latest_root_slot - || matches!( - locked_fork_graph.relationship( - entry.deployment_slot, - loaded_programs_for_tx_batch.slot - ), - BlockRelation::Equal | BlockRelation::Ancestor - ) - { - let entry_to_return = if loaded_programs_for_tx_batch.slot - >= entry.effective_slot - && Self::matches_environment( - entry, - &loaded_programs_for_tx_batch.environments, - ) { - if !Self::matches_criteria(entry, match_criteria) { - break; - } - if let ProgramCacheEntryType::Unloaded(_environment) = &entry.program { - break; + match &self.index { + IndexImplementation::V1 { + entries, + loading_entries, + } => { + search_for.retain(|(key, (match_criteria, usage_count))| { + if let Some(second_level) = entries.get(key) { + for entry in second_level.iter().rev() { + if entry.deployment_slot <= self.latest_root_slot + || matches!( + locked_fork_graph.relationship( + entry.deployment_slot, + loaded_programs_for_tx_batch.slot + ), + BlockRelation::Equal | BlockRelation::Ancestor + ) + { + let entry_to_return = if loaded_programs_for_tx_batch.slot + >= entry.effective_slot + && Self::matches_environment( + entry, + &loaded_programs_for_tx_batch.environments, + ) { + if !Self::matches_criteria(entry, match_criteria) { + break; + } + if let ProgramCacheEntryType::Unloaded(_environment) = + &entry.program + { + break; + } + entry.clone() + } else if entry.is_implicit_delay_visibility_tombstone( + loaded_programs_for_tx_batch.slot, + ) { + // Found a program entry on the current fork, but it's not effective + // yet. It indicates that the program has delayed visibility. Return + // the tombstone to reflect that. + Arc::new(ProgramCacheEntry::new_tombstone( + entry.deployment_slot, + entry.account_owner, + ProgramCacheEntryType::DelayVisibility, + )) + } else { + continue; + }; + entry_to_return + .update_access_slot(loaded_programs_for_tx_batch.slot); + entry_to_return + .tx_usage_counter + .fetch_add(*usage_count, Ordering::Relaxed); + loaded_programs_for_tx_batch + .entries + .insert(*key, entry_to_return); + return false; } - entry.clone() - } else if entry.is_implicit_delay_visibility_tombstone( - loaded_programs_for_tx_batch.slot, - ) { - // Found a program entry on the current fork, but it's not effective - // yet. It indicates that the program has delayed visibility. Return - // the tombstone to reflect that. - Arc::new(ProgramCacheEntry::new_tombstone( - entry.deployment_slot, - entry.account_owner, - ProgramCacheEntryType::DelayVisibility, - )) - } else { - continue; - }; - entry_to_return.update_access_slot(loaded_programs_for_tx_batch.slot); - entry_to_return - .tx_usage_counter - .fetch_add(*usage_count, Ordering::Relaxed); - loaded_programs_for_tx_batch - .entries - .insert(*key, entry_to_return); - return false; + } } - } - } - if cooperative_loading_task.is_none() { - let mut loading_entries = self.loading_entries.lock().unwrap(); - let entry = loading_entries.entry(*key); - if let Entry::Vacant(entry) = entry { - entry.insert(( - loaded_programs_for_tx_batch.slot, - std::thread::current().id(), - )); - cooperative_loading_task = Some((*key, *usage_count)); - } + if cooperative_loading_task.is_none() { + let mut loading_entries = loading_entries.lock().unwrap(); + let entry = loading_entries.entry(*key); + if let Entry::Vacant(entry) = entry { + entry.insert(( + loaded_programs_for_tx_batch.slot, + std::thread::current().id(), + )); + cooperative_loading_task = Some((*key, *usage_count)); + } + } + true + }); } - true - }); + } drop(locked_fork_graph); if is_first_round { self.stats @@ -1083,25 +1126,31 @@ impl ProgramCache { key: Pubkey, loaded_program: Arc, ) -> bool { - let loading_thread = self.loading_entries.lock().unwrap().remove(&key); - debug_assert_eq!(loading_thread, Some((slot, std::thread::current().id()))); - // Check that it will be visible to our own fork once inserted - if loaded_program.deployment_slot > self.latest_root_slot - && !matches!( - self.fork_graph - .as_ref() - .unwrap() - .read() - .unwrap() - .relationship(loaded_program.deployment_slot, slot), - BlockRelation::Equal | BlockRelation::Ancestor - ) - { - self.stats.lost_insertions.fetch_add(1, Ordering::Relaxed); + match &mut self.index { + IndexImplementation::V1 { + loading_entries, .. + } => { + let loading_thread = loading_entries.get_mut().unwrap().remove(&key); + debug_assert_eq!(loading_thread, Some((slot, std::thread::current().id()))); + // Check that it will be visible to our own fork once inserted + if loaded_program.deployment_slot > self.latest_root_slot + && !matches!( + self.fork_graph + .as_ref() + .unwrap() + .read() + .unwrap() + .relationship(loaded_program.deployment_slot, slot), + BlockRelation::Equal | BlockRelation::Ancestor + ) + { + self.stats.lost_insertions.fetch_add(1, Ordering::Relaxed); + } + let was_occupied = self.assign_program(key, loaded_program); + self.loading_task_waiter.notify(); + was_occupied + } } - let was_occupied = self.assign_program(key, loaded_program); - self.loading_task_waiter.notify(); - was_occupied } pub fn merge(&mut self, tx_batch_cache: &ProgramCacheForTxBatch) { @@ -1116,45 +1165,51 @@ impl ProgramCache { include_program_runtime_v1: bool, include_program_runtime_v2: bool, ) -> Vec<(Pubkey, Arc)> { - self.entries - .iter() - .flat_map(|(id, second_level)| { - second_level - .iter() - .filter_map(move |program| match program.program { - ProgramCacheEntryType::Loaded(_) => { - if (program.account_owner != ProgramCacheEntryOwner::LoaderV4 - && include_program_runtime_v1) - || (program.account_owner == ProgramCacheEntryOwner::LoaderV4 - && include_program_runtime_v2) - { - Some((*id, program.clone())) - } else { - None + match &self.index { + IndexImplementation::V1 { entries, .. } => entries + .iter() + .flat_map(|(id, second_level)| { + second_level + .iter() + .filter_map(move |program| match program.program { + ProgramCacheEntryType::Loaded(_) => { + if (program.account_owner != ProgramCacheEntryOwner::LoaderV4 + && include_program_runtime_v1) + || (program.account_owner == ProgramCacheEntryOwner::LoaderV4 + && include_program_runtime_v2) + { + Some((*id, program.clone())) + } else { + None + } } - } - _ => None, - }) - }) - .collect() + _ => None, + }) + }) + .collect(), + } } /// Returns the list of all entries in the cache. pub fn get_flattened_entries_for_tests(&self) -> Vec<(Pubkey, Arc)> { - self.entries - .iter() - .flat_map(|(id, second_level)| { - second_level.iter().map(|program| (*id, program.clone())) - }) - .collect() + match &self.index { + IndexImplementation::V1 { entries, .. } => entries + .iter() + .flat_map(|(id, second_level)| { + second_level.iter().map(|program| (*id, program.clone())) + }) + .collect(), + } } /// Returns the slot versions for the given program id. pub fn get_slot_versions_for_tests(&self, key: &Pubkey) -> &[Arc] { - self.entries - .get(key) - .map(|second_level| second_level.as_ref()) - .unwrap_or(&[]) + match &self.index { + IndexImplementation::V1 { entries, .. } => entries + .get(key) + .map(|second_level| second_level.as_ref()) + .unwrap_or(&[]), + } } /// Unloads programs which were used infrequently @@ -1204,33 +1259,41 @@ impl ProgramCache { /// Removes all the entries at the given keys, if they exist pub fn remove_programs(&mut self, keys: impl Iterator) { - for k in keys { - self.entries.remove(&k); + match &mut self.index { + IndexImplementation::V1 { entries, .. } => { + for k in keys { + entries.remove(&k); + } + } } } /// This function removes the given entry for the given program from the cache. /// The function expects that the program and entry exists in the cache. Otherwise it'll panic. fn unload_program_entry(&mut self, program: &Pubkey, remove_entry: &Arc) { - let second_level = self.entries.get_mut(program).expect("Cache lookup failed"); - let candidate = second_level - .iter_mut() - .find(|entry| entry == &remove_entry) - .expect("Program entry not found"); - - // Certain entry types cannot be unloaded, such as tombstones, or already unloaded entries. - // For such entries, `to_unloaded()` will return None. - // These entry types do not occupy much memory. - if let Some(unloaded) = candidate.to_unloaded() { - if candidate.tx_usage_counter.load(Ordering::Relaxed) == 1 { - self.stats.one_hit_wonders.fetch_add(1, Ordering::Relaxed); + match &mut self.index { + IndexImplementation::V1 { entries, .. } => { + let second_level = entries.get_mut(program).expect("Cache lookup failed"); + let candidate = second_level + .iter_mut() + .find(|entry| entry == &remove_entry) + .expect("Program entry not found"); + + // Certain entry types cannot be unloaded, such as tombstones, or already unloaded entries. + // For such entries, `to_unloaded()` will return None. + // These entry types do not occupy much memory. + if let Some(unloaded) = candidate.to_unloaded() { + if candidate.tx_usage_counter.load(Ordering::Relaxed) == 1 { + self.stats.one_hit_wonders.fetch_add(1, Ordering::Relaxed); + } + self.stats + .evictions + .entry(*program) + .and_modify(|c| saturating_add_assign!(*c, 1)) + .or_insert(1); + *candidate = Arc::new(unloaded); + } } - self.stats - .evictions - .entry(*program) - .and_modify(|c| saturating_add_assign!(*c, 1)) - .or_insert(1); - *candidate = Arc::new(unloaded); } } @@ -1244,14 +1307,17 @@ impl ProgramCache { } fn remove_programs_with_no_entries(&mut self) { - let num_programs_before_removal = self.entries.len(); - self.entries - .retain(|_key, second_level| !second_level.is_empty()); - if self.entries.len() < num_programs_before_removal { - self.stats.empty_entries.fetch_add( - num_programs_before_removal.saturating_sub(self.entries.len()) as u64, - Ordering::Relaxed, - ); + match &mut self.index { + IndexImplementation::V1 { entries, .. } => { + let num_programs_before_removal = entries.len(); + entries.retain(|_key, second_level| !second_level.is_empty()); + if entries.len() < num_programs_before_removal { + self.stats.empty_entries.fetch_add( + num_programs_before_removal.saturating_sub(entries.len()) as u64, + Ordering::Relaxed, + ); + } + } } } } @@ -2075,17 +2141,22 @@ mod tests { keys: &[Pubkey], ) -> Vec<(Pubkey, (ProgramCacheMatchCriteria, u64))> { let locked_fork_graph = cache.fork_graph.as_ref().unwrap().read().unwrap(); + let entries = cache.get_flattened_entries_for_tests(); keys.iter() .filter_map(|key| { - let visible_entry = cache.entries.get(key).and_then(|second_level| { - second_level.iter().rev().find(|entry| { - matches!( - locked_fork_graph.relationship(entry.deployment_slot, loading_slot), - BlockRelation::Equal | BlockRelation::Ancestor, - ) + entries + .iter() + .rev() + .find(|(program_id, entry)| { + program_id == key + && matches!( + locked_fork_graph.relationship(entry.deployment_slot, loading_slot), + BlockRelation::Equal | BlockRelation::Ancestor, + ) + }) + .map(|(program_id, _entry)| { + (*program_id, (ProgramCacheMatchCriteria::NoCriteria, 1)) }) - }); - visible_entry.map(|_| (*key, (ProgramCacheMatchCriteria::NoCriteria, 1))) }) .collect() }