Skip to content

Commit

Permalink
Adds LoadedProgramType::Loading and LoadedProgram::new_loading().
Browse files Browse the repository at this point in the history
  • Loading branch information
Lichtso committed Nov 27, 2023
1 parent deee5f8 commit 90c139c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 14 deletions.
62 changes: 51 additions & 11 deletions program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub enum LoadedProgramType {
#[default]
Closed,
DelayVisibility,
/// Currently enqueued in the cooperative loading phase
Loading(Mutex<(bool, Vec<Arc<Mutex<ExtractedPrograms>>>)>),
/// Successfully verified but not currently compiled, used to track usage statistics when a compiled program is evicted from memory.
Unloaded(ProgramRuntimeEnvironment),
LegacyV0(Executable<InvokeContext<'static>>),
Expand All @@ -97,6 +99,7 @@ impl Debug for LoadedProgramType {
}
LoadedProgramType::Closed => write!(f, "LoadedProgramType::Closed"),
LoadedProgramType::DelayVisibility => write!(f, "LoadedProgramType::DelayVisibility"),
LoadedProgramType::Loading(_) => write!(f, "LoadedProgramType::Loading"),
LoadedProgramType::Unloaded(_) => write!(f, "LoadedProgramType::Unloaded"),
LoadedProgramType::LegacyV0(_) => write!(f, "LoadedProgramType::LegacyV0"),
LoadedProgramType::LegacyV1(_) => write!(f, "LoadedProgramType::LegacyV1"),
Expand Down Expand Up @@ -275,6 +278,18 @@ impl LoadedProgram {
)
}

fn new_loading() -> Self {
Self {
program: LoadedProgramType::Loading(Mutex::new((false, vec![]))),
account_size: 0,
deployment_slot: 0,
effective_slot: 0,
maybe_expiration_slot: None,
tx_usage_counter: AtomicU64::new(0),
ix_usage_counter: AtomicU64::new(0),
}
}

/// Reloads a user program, *without* running the verifier.
///
/// # Safety
Expand Down Expand Up @@ -511,7 +526,7 @@ pub struct LoadedProgramsForTxBatch {

pub struct ExtractedPrograms {
pub loaded: LoadedProgramsForTxBatch,
pub missing: HashMap<Pubkey, (u64, bool)>,
pub missing: HashMap<Pubkey, (Arc<LoadedProgram>, bool)>,
}

impl LoadedProgramsForTxBatch {
Expand Down Expand Up @@ -791,7 +806,7 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
/// Extracts a subset of the programs relevant to a transaction batch
/// and returns which program accounts the accounts DB needs to load.
pub fn extract<S: WorkingSlot>(
&self,
&mut self,
working_slot: &S,
keys: impl Iterator<Item = (Pubkey, (LoadedProgramMatchCriteria, u64))>,
) -> Arc<Mutex<ExtractedPrograms>> {
Expand All @@ -808,9 +823,13 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
let mut extracting = extracted.lock().unwrap();
extracting.loaded.entries = keys
.filter_map(|(key, (match_criteria, count))| {
let mut missing_entry = None;
let mut reloading = false;
if let Some(second_level) = self.entries.get(&key) {
for entry in second_level.iter().rev() {
if let LoadedProgramType::Loading(_) = &entry.program {
continue;
}
let is_ancestor = if let Some(fork_graph) = &self.fork_graph {
fork_graph
.read()
Expand All @@ -825,7 +844,6 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
} else {
working_slot.is_ancestor(entry.deployment_slot)
};

if entry.deployment_slot <= self.latest_root_slot
|| entry.deployment_slot == current_slot
|| is_ancestor
Expand All @@ -842,15 +860,13 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
break;
}

let mut usage_count =
entry.tx_usage_counter.load(Ordering::Relaxed);
saturating_add_assign!(usage_count, count);
entry.tx_usage_counter.store(usage_count, Ordering::Relaxed);
entry.tx_usage_counter.fetch_add(count, Ordering::Relaxed);
return Some((key, entry.clone()));
} else if entry.is_implicit_delay_visibility_tombstone(current_slot) {
// Found a program entry on the current fork, but it's not effective
// yet. It indicates that the program has delayed visibility. Return
// the tombstone to reflect that.
entry.tx_usage_counter.fetch_add(count, Ordering::Relaxed);
return Some((
key,
Arc::new(LoadedProgram::new_tombstone(
Expand All @@ -861,8 +877,15 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
}
}
}
if let Some(entry) = second_level.first() {
if let LoadedProgramType::Loading(_) = &entry.program {
missing_entry = Some(entry.clone());
}
}
}
extracting.missing.insert(key, (count, reloading));
let entry = missing_entry.unwrap_or_else(|| Arc::new(LoadedProgram::new_loading()));
entry.tx_usage_counter.fetch_add(count, Ordering::Relaxed);
extracting.missing.insert(key, (entry, reloading));
None
})
.collect::<HashMap<Pubkey, Arc<LoadedProgram>>>();
Expand All @@ -872,6 +895,23 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
self.stats
.hits
.fetch_add(extracting.loaded.entries.len() as u64, Ordering::Relaxed);
for (key, (entry, _reloading)) in extracting.missing.iter() {
if let LoadedProgramType::Loading(mutex) = &entry.program {
// The Mutex around `waiting_list` is strictly speaking unnecessary
// because the entire `LoadedPrograms` cache is already locked here.
let waiting_list = &mut mutex.lock().unwrap().1;
if waiting_list.is_empty() {
self.assign_program(*key, entry.clone());
}
let index = match waiting_list.binary_search_by(|tx_batch| {
tx_batch.lock().unwrap().loaded.slot.cmp(&current_slot)
}) {
Ok(index) => index,
Err(index) => index,
};
waiting_list.insert(index, extracted.clone());
}
}
drop(extracting);
extracted
}
Expand Down Expand Up @@ -1648,7 +1688,7 @@ mod tests {
extracted
.missing
.get(key)
.filter(|(_count, reloading)| *reloading == reload)
.filter(|(_entry, reloading)| *reloading == reload)
.is_some()
}

Expand Down Expand Up @@ -2257,7 +2297,7 @@ mod tests {
.get(&program1)
.expect("Didn't find program1")
.len(),
3
4
);

// New root 5 should not evict the expired entry for program1
Expand All @@ -2268,7 +2308,7 @@ mod tests {
.get(&program1)
.expect("Didn't find program1")
.len(),
1
2
);

// New root 15 should evict the expired entry for program1
Expand Down
9 changes: 6 additions & 3 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5073,7 +5073,7 @@ impl Bank {
missing,
} = {
// Lock the global cache to figure out which programs need to be loaded
let loaded_programs_cache = self.loaded_programs_cache.read().unwrap();
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
Mutex::into_inner(
Arc::into_inner(
loaded_programs_cache.extract(self, programs_and_slots.into_iter()),
Expand All @@ -5086,9 +5086,12 @@ impl Bank {
// Load missing programs while global cache is unlocked
let missing_programs: Vec<(Pubkey, Arc<LoadedProgram>)> = missing
.iter()
.map(|(key, (count, reloading))| {
.map(|(key, (entry, reloading))| {
let program = self.load_program(key, *reloading, None);
program.tx_usage_counter.store(*count, Ordering::Relaxed);
program.tx_usage_counter.store(
entry.tx_usage_counter.load(Ordering::Relaxed),
Ordering::Relaxed,
);
(*key, program)
})
.collect();
Expand Down

0 comments on commit 90c139c

Please sign in to comment.