diff --git a/program-runtime/src/loaded_programs.rs b/program-runtime/src/loaded_programs.rs index 6bed0acc573fae..8af784bf00dcac 100644 --- a/program-runtime/src/loaded_programs.rs +++ b/program-runtime/src/loaded_programs.rs @@ -829,50 +829,64 @@ impl ProgramCache { &entry.program, ProgramCacheEntryType::DelayVisibility )); - let slot_versions = &mut self.entries.entry(key).or_default(); - match slot_versions.binary_search_by(|at| { - at.effective_slot - .cmp(&entry.effective_slot) - .then(at.deployment_slot.cmp(&entry.deployment_slot)) - }) { - Ok(index) => { - let existing = slot_versions.get_mut(index).unwrap(); - match (&existing.program, &entry.program) { - (ProgramCacheEntryType::Builtin(_), ProgramCacheEntryType::Builtin(_)) - | (ProgramCacheEntryType::Unloaded(_), ProgramCacheEntryType::Loaded(_)) => {} - _ => { - // Something is wrong, I can feel it ... - error!("ProgramCache::assign_program() failed key={:?} existing={:?} entry={:?}", key, slot_versions, entry); - debug_assert!(false, "Unexpected replacement of an entry"); - self.stats.replacements.fetch_add(1, Ordering::Relaxed); - return true; + match &mut self.index { + IndexImplementation::V1 { entries, .. } => { + let slot_versions = &mut entries.entry(key).or_default(); + match slot_versions.binary_search_by(|at| { + at.effective_slot + .cmp(&entry.effective_slot) + .then(at.deployment_slot.cmp(&entry.deployment_slot)) + }) { + Ok(index) => { + let existing = slot_versions.get_mut(index).unwrap(); + match (&existing.program, &entry.program) { + ( + ProgramCacheEntryType::Builtin(_), + ProgramCacheEntryType::Builtin(_), + ) + | ( + ProgramCacheEntryType::Unloaded(_), + ProgramCacheEntryType::Loaded(_), + ) => {} + _ => { + // Something is wrong, I can feel it ... + error!("ProgramCache::assign_program() failed key={:?} existing={:?} entry={:?}", key, slot_versions, entry); + debug_assert!(false, "Unexpected replacement of an entry"); + self.stats.replacements.fetch_add(1, Ordering::Relaxed); + return true; + } + } + // Copy over the usage counter to the new entry + entry.tx_usage_counter.fetch_add( + existing.tx_usage_counter.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + entry.ix_usage_counter.fetch_add( + existing.ix_usage_counter.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + *existing = Arc::clone(&entry); + self.stats.reloads.fetch_add(1, Ordering::Relaxed); + } + Err(index) => { + self.stats.insertions.fetch_add(1, Ordering::Relaxed); + slot_versions.insert(index, Arc::clone(&entry)); } } - // Copy over the usage counter to the new entry - entry.tx_usage_counter.fetch_add( - existing.tx_usage_counter.load(Ordering::Relaxed), - Ordering::Relaxed, - ); - entry.ix_usage_counter.fetch_add( - existing.ix_usage_counter.load(Ordering::Relaxed), - Ordering::Relaxed, - ); - *existing = Arc::clone(&entry); - self.stats.reloads.fetch_add(1, Ordering::Relaxed); - } - Err(index) => { - self.stats.insertions.fetch_add(1, Ordering::Relaxed); - slot_versions.insert(index, Arc::clone(&entry)); } } false } pub fn prune_by_deployment_slot(&mut self, slot: Slot) { - for second_level in self.entries.values_mut() { - second_level.retain(|entry| entry.deployment_slot != slot); + match &mut self.index { + IndexImplementation::V1 { entries, .. } => { + for second_level in entries.values_mut() { + second_level.retain(|entry| entry.deployment_slot != slot); + } + self.remove_programs_with_no_entries(); + } } - self.remove_programs_with_no_entries(); } /// Before rerooting the blockstore this removes all superfluous entries @@ -894,60 +908,65 @@ impl ProgramCache { self.programs_to_recompile.clear(); } } - for second_level in self.entries.values_mut() { - // Remove entries un/re/deployed on orphan forks - let mut first_ancestor_found = false; - let mut first_ancestor_env = None; - *second_level = second_level - .iter() - .rev() - .filter(|entry| { - let relation = fork_graph.relationship(entry.deployment_slot, new_root_slot); - if entry.deployment_slot >= new_root_slot { - matches!(relation, BlockRelation::Equal | BlockRelation::Descendant) - } else if matches!(relation, BlockRelation::Ancestor) - || entry.deployment_slot <= self.latest_root_slot - { - if !first_ancestor_found { - first_ancestor_found = true; - first_ancestor_env = entry.program.get_environment(); - return true; - } - // Do not prune the entry if the runtime environment of the entry is different - // than the entry that was previously found (stored in first_ancestor_env). - // Different environment indicates that this entry might belong to an older - // epoch that had a different environment (e.g. different feature set). - // Once the root moves to the new/current epoch, the entry will get pruned. - // But, until then the entry might still be getting used by an older slot. - if let Some(entry_env) = entry.program.get_environment() { - if let Some(env) = first_ancestor_env { - if !Arc::ptr_eq(entry_env, env) { + match &mut self.index { + IndexImplementation::V1 { entries, .. } => { + for second_level in entries.values_mut() { + // Remove entries un/re/deployed on orphan forks + let mut first_ancestor_found = false; + let mut first_ancestor_env = None; + *second_level = second_level + .iter() + .rev() + .filter(|entry| { + let relation = + fork_graph.relationship(entry.deployment_slot, new_root_slot); + if entry.deployment_slot >= new_root_slot { + matches!(relation, BlockRelation::Equal | BlockRelation::Descendant) + } else if matches!(relation, BlockRelation::Ancestor) + || entry.deployment_slot <= self.latest_root_slot + { + if !first_ancestor_found { + first_ancestor_found = true; + first_ancestor_env = entry.program.get_environment(); return true; } + // Do not prune the entry if the runtime environment of the entry is different + // than the entry that was previously found (stored in first_ancestor_env). + // Different environment indicates that this entry might belong to an older + // epoch that had a different environment (e.g. different feature set). + // Once the root moves to the new/current epoch, the entry will get pruned. + // But, until then the entry might still be getting used by an older slot. + if let Some(entry_env) = entry.program.get_environment() { + if let Some(env) = first_ancestor_env { + if !Arc::ptr_eq(entry_env, env) { + return true; + } + } + } + self.stats.prunes_orphan.fetch_add(1, Ordering::Relaxed); + false + } else { + self.stats.prunes_orphan.fetch_add(1, Ordering::Relaxed); + false } - } - self.stats.prunes_orphan.fetch_add(1, Ordering::Relaxed); - false - } else { - self.stats.prunes_orphan.fetch_add(1, Ordering::Relaxed); - false - } - }) - .filter(|entry| { - // Remove outdated environment of previous feature set - if recompilation_phase_ends - && !Self::matches_environment(entry, &self.environments) - { - self.stats - .prunes_environment - .fetch_add(1, Ordering::Relaxed); - return false; - } - true - }) - .cloned() - .collect(); - second_level.reverse(); + }) + .filter(|entry| { + // Remove outdated environment of previous feature set + if recompilation_phase_ends + && !Self::matches_environment(entry, &self.environments) + { + self.stats + .prunes_environment + .fetch_add(1, Ordering::Relaxed); + return false; + } + true + }) + .cloned() + .collect(); + second_level.reverse(); + } + } } self.remove_programs_with_no_entries(); debug_assert!(self.latest_root_slot <= new_root_slot); @@ -989,69 +1008,79 @@ impl ProgramCache { debug_assert!(self.fork_graph.is_some()); let locked_fork_graph = self.fork_graph.as_ref().unwrap().read().unwrap(); let mut cooperative_loading_task = None; - search_for.retain(|(key, (match_criteria, usage_count))| { - if let Some(second_level) = self.entries.get(key) { - for entry in second_level.iter().rev() { - if entry.deployment_slot <= self.latest_root_slot - || matches!( - locked_fork_graph.relationship( - entry.deployment_slot, - loaded_programs_for_tx_batch.slot - ), - BlockRelation::Equal | BlockRelation::Ancestor - ) - { - let entry_to_return = if loaded_programs_for_tx_batch.slot - >= entry.effective_slot - && Self::matches_environment( - entry, - &loaded_programs_for_tx_batch.environments, - ) { - if !Self::matches_criteria(entry, match_criteria) { - break; - } - if let ProgramCacheEntryType::Unloaded(_environment) = &entry.program { - break; + match &self.index { + IndexImplementation::V1 { + entries, + loading_entries, + } => { + search_for.retain(|(key, (match_criteria, usage_count))| { + if let Some(second_level) = entries.get(key) { + for entry in second_level.iter().rev() { + if entry.deployment_slot <= self.latest_root_slot + || matches!( + locked_fork_graph.relationship( + entry.deployment_slot, + loaded_programs_for_tx_batch.slot + ), + BlockRelation::Equal | BlockRelation::Ancestor + ) + { + let entry_to_return = if loaded_programs_for_tx_batch.slot + >= entry.effective_slot + && Self::matches_environment( + entry, + &loaded_programs_for_tx_batch.environments, + ) { + if !Self::matches_criteria(entry, match_criteria) { + break; + } + if let ProgramCacheEntryType::Unloaded(_environment) = + &entry.program + { + break; + } + entry.clone() + } else if entry.is_implicit_delay_visibility_tombstone( + loaded_programs_for_tx_batch.slot, + ) { + // Found a program entry on the current fork, but it's not effective + // yet. It indicates that the program has delayed visibility. Return + // the tombstone to reflect that. + Arc::new(ProgramCacheEntry::new_tombstone( + entry.deployment_slot, + entry.account_owner, + ProgramCacheEntryType::DelayVisibility, + )) + } else { + continue; + }; + entry_to_return + .update_access_slot(loaded_programs_for_tx_batch.slot); + entry_to_return + .tx_usage_counter + .fetch_add(*usage_count, Ordering::Relaxed); + loaded_programs_for_tx_batch + .entries + .insert(*key, entry_to_return); + return false; } - entry.clone() - } else if entry.is_implicit_delay_visibility_tombstone( - loaded_programs_for_tx_batch.slot, - ) { - // Found a program entry on the current fork, but it's not effective - // yet. It indicates that the program has delayed visibility. Return - // the tombstone to reflect that. - Arc::new(ProgramCacheEntry::new_tombstone( - entry.deployment_slot, - entry.account_owner, - ProgramCacheEntryType::DelayVisibility, - )) - } else { - continue; - }; - entry_to_return.update_access_slot(loaded_programs_for_tx_batch.slot); - entry_to_return - .tx_usage_counter - .fetch_add(*usage_count, Ordering::Relaxed); - loaded_programs_for_tx_batch - .entries - .insert(*key, entry_to_return); - return false; + } } - } - } - if cooperative_loading_task.is_none() { - let mut loading_entries = self.loading_entries.lock().unwrap(); - let entry = loading_entries.entry(*key); - if let Entry::Vacant(entry) = entry { - entry.insert(( - loaded_programs_for_tx_batch.slot, - std::thread::current().id(), - )); - cooperative_loading_task = Some((*key, *usage_count)); - } + if cooperative_loading_task.is_none() { + let mut loading_entries = loading_entries.lock().unwrap(); + let entry = loading_entries.entry(*key); + if let Entry::Vacant(entry) = entry { + entry.insert(( + loaded_programs_for_tx_batch.slot, + std::thread::current().id(), + )); + cooperative_loading_task = Some((*key, *usage_count)); + } + } + true + }); } - true - }); + } drop(locked_fork_graph); if is_first_round { self.stats @@ -1072,25 +1101,31 @@ impl ProgramCache { key: Pubkey, loaded_program: Arc, ) -> bool { - let loading_thread = self.loading_entries.lock().unwrap().remove(&key); - debug_assert_eq!(loading_thread, Some((slot, std::thread::current().id()))); - // Check that it will be visible to our own fork once inserted - if loaded_program.deployment_slot > self.latest_root_slot - && !matches!( - self.fork_graph - .as_ref() - .unwrap() - .read() - .unwrap() - .relationship(loaded_program.deployment_slot, slot), - BlockRelation::Equal | BlockRelation::Ancestor - ) - { - self.stats.lost_insertions.fetch_add(1, Ordering::Relaxed); + match &mut self.index { + IndexImplementation::V1 { + loading_entries, .. + } => { + let loading_thread = loading_entries.get_mut().unwrap().remove(&key); + debug_assert_eq!(loading_thread, Some((slot, std::thread::current().id()))); + // Check that it will be visible to our own fork once inserted + if loaded_program.deployment_slot > self.latest_root_slot + && !matches!( + self.fork_graph + .as_ref() + .unwrap() + .read() + .unwrap() + .relationship(loaded_program.deployment_slot, slot), + BlockRelation::Equal | BlockRelation::Ancestor + ) + { + self.stats.lost_insertions.fetch_add(1, Ordering::Relaxed); + } + let was_occupied = self.assign_program(key, loaded_program); + self.loading_task_waiter.notify(); + was_occupied + } } - let was_occupied = self.assign_program(key, loaded_program); - self.loading_task_waiter.notify(); - was_occupied } pub fn merge(&mut self, tx_batch_cache: &ProgramCacheForTxBatch) { @@ -1105,45 +1140,51 @@ impl ProgramCache { include_program_runtime_v1: bool, include_program_runtime_v2: bool, ) -> Vec<(Pubkey, Arc)> { - self.entries - .iter() - .flat_map(|(id, second_level)| { - second_level - .iter() - .filter_map(move |program| match program.program { - ProgramCacheEntryType::Loaded(_) => { - if (program.account_owner != ProgramCacheEntryOwner::LoaderV4 - && include_program_runtime_v1) - || (program.account_owner == ProgramCacheEntryOwner::LoaderV4 - && include_program_runtime_v2) - { - Some((*id, program.clone())) - } else { - None + match &self.index { + IndexImplementation::V1 { entries, .. } => entries + .iter() + .flat_map(|(id, second_level)| { + second_level + .iter() + .filter_map(move |program| match program.program { + ProgramCacheEntryType::Loaded(_) => { + if (program.account_owner != ProgramCacheEntryOwner::LoaderV4 + && include_program_runtime_v1) + || (program.account_owner == ProgramCacheEntryOwner::LoaderV4 + && include_program_runtime_v2) + { + Some((*id, program.clone())) + } else { + None + } } - } - _ => None, - }) - }) - .collect() + _ => None, + }) + }) + .collect(), + } } /// Returns the list of all entries in the cache. pub fn get_flattened_entries_for_tests(&self) -> Vec<(Pubkey, Arc)> { - self.entries - .iter() - .flat_map(|(id, second_level)| { - second_level.iter().map(|program| (*id, program.clone())) - }) - .collect() + match &self.index { + IndexImplementation::V1 { entries, .. } => entries + .iter() + .flat_map(|(id, second_level)| { + second_level.iter().map(|program| (*id, program.clone())) + }) + .collect(), + } } /// Returns the slot versions for the given program id. pub fn get_slot_versions_for_tests(&self, key: &Pubkey) -> &[Arc] { - self.entries - .get(key) - .map(|second_level| second_level.as_ref()) - .unwrap_or(&[]) + match &self.index { + IndexImplementation::V1 { entries, .. } => entries + .get(key) + .map(|second_level| second_level.as_ref()) + .unwrap_or(&[]), + } } /// Unloads programs which were used infrequently @@ -1193,33 +1234,41 @@ impl ProgramCache { /// Removes all the entries at the given keys, if they exist pub fn remove_programs(&mut self, keys: impl Iterator) { - for k in keys { - self.entries.remove(&k); + match &mut self.index { + IndexImplementation::V1 { entries, .. } => { + for k in keys { + entries.remove(&k); + } + } } } /// This function removes the given entry for the given program from the cache. /// The function expects that the program and entry exists in the cache. Otherwise it'll panic. fn unload_program_entry(&mut self, program: &Pubkey, remove_entry: &Arc) { - let second_level = self.entries.get_mut(program).expect("Cache lookup failed"); - let candidate = second_level - .iter_mut() - .find(|entry| entry == &remove_entry) - .expect("Program entry not found"); - - // Certain entry types cannot be unloaded, such as tombstones, or already unloaded entries. - // For such entries, `to_unloaded()` will return None. - // These entry types do not occupy much memory. - if let Some(unloaded) = candidate.to_unloaded() { - if candidate.tx_usage_counter.load(Ordering::Relaxed) == 1 { - self.stats.one_hit_wonders.fetch_add(1, Ordering::Relaxed); + match &mut self.index { + IndexImplementation::V1 { entries, .. } => { + let second_level = entries.get_mut(program).expect("Cache lookup failed"); + let candidate = second_level + .iter_mut() + .find(|entry| entry == &remove_entry) + .expect("Program entry not found"); + + // Certain entry types cannot be unloaded, such as tombstones, or already unloaded entries. + // For such entries, `to_unloaded()` will return None. + // These entry types do not occupy much memory. + if let Some(unloaded) = candidate.to_unloaded() { + if candidate.tx_usage_counter.load(Ordering::Relaxed) == 1 { + self.stats.one_hit_wonders.fetch_add(1, Ordering::Relaxed); + } + self.stats + .evictions + .entry(*program) + .and_modify(|c| saturating_add_assign!(*c, 1)) + .or_insert(1); + *candidate = Arc::new(unloaded); + } } - self.stats - .evictions - .entry(*program) - .and_modify(|c| saturating_add_assign!(*c, 1)) - .or_insert(1); - *candidate = Arc::new(unloaded); } } @@ -1233,14 +1282,17 @@ impl ProgramCache { } fn remove_programs_with_no_entries(&mut self) { - let num_programs_before_removal = self.entries.len(); - self.entries - .retain(|_key, second_level| !second_level.is_empty()); - if self.entries.len() < num_programs_before_removal { - self.stats.empty_entries.fetch_add( - num_programs_before_removal.saturating_sub(self.entries.len()) as u64, - Ordering::Relaxed, - ); + match &mut self.index { + IndexImplementation::V1 { entries, .. } => { + let num_programs_before_removal = entries.len(); + entries.retain(|_key, second_level| !second_level.is_empty()); + if entries.len() < num_programs_before_removal { + self.stats.empty_entries.fetch_add( + num_programs_before_removal.saturating_sub(entries.len()) as u64, + Ordering::Relaxed, + ); + } + } } } }