Skip to content

Commit

Permalink
Adds LoadedPrograms::next_cooperative_loading_task() and LoadedProgra…
Browse files Browse the repository at this point in the history
…ms::cooperative_loading_task_complete().
  • Loading branch information
Lichtso committed Nov 27, 2023
1 parent 7f48b58 commit edf1dcb
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 46 deletions.
97 changes: 84 additions & 13 deletions program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -844,15 +844,13 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
break;
}

let mut usage_count =
entry.tx_usage_counter.load(Ordering::Relaxed);
saturating_add_assign!(usage_count, count);
entry.tx_usage_counter.store(usage_count, Ordering::Relaxed);
entry.tx_usage_counter.fetch_add(count, Ordering::Relaxed);
return Some((key, entry.clone()));
} else if entry.is_implicit_delay_visibility_tombstone(current_slot) {
// Found a program entry on the current fork, but it's not effective
// yet. It indicates that the program has delayed visibility. Return
// the tombstone to reflect that.
entry.tx_usage_counter.fetch_add(count, Ordering::Relaxed);
return Some((
key,
Arc::new(LoadedProgram::new_tombstone(
Expand All @@ -865,19 +863,13 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
}
if let Some(entry) = second_level.first() {
if let LoadedProgramType::Loading(_) = &entry.program {
entry.tx_usage_counter.store(count, Ordering::Relaxed);
missing_entry = Some(entry.clone());
}
}
}
extracting.missing.insert(
key,
(
missing_entry
.unwrap_or_else(|| Arc::new(LoadedProgram::new_loading(count))),
reloading,
),
);
let entry = missing_entry.unwrap_or_else(|| Arc::new(LoadedProgram::new_loading()));
entry.tx_usage_counter.fetch_add(count, Ordering::Relaxed);
extracting.missing.insert(key, (entry, reloading));
None
})
.collect::<HashMap<Pubkey, Arc<LoadedProgram>>>();
Expand Down Expand Up @@ -908,6 +900,85 @@ impl<FG: ForkGraph> LoadedPrograms<FG> {
extracted
}

/// In cooperative loading a TX batch calls this to receive the next task
pub fn next_cooperative_loading_task(
&mut self,
extracted: &Arc<Mutex<ExtractedPrograms>>,
) -> Option<(Pubkey, Arc<LoadedProgram>, bool)> {
// The Mutexes are strictly speaking unnecessary
// because the entire `LoadedPrograms` cache is already locked here.
let extracted = extracted.lock().unwrap();
let (key, (entry, reload)) =
extracted.missing.iter().find(|(_key, (entry, _reload))| {
let LoadedProgramType::Loading(mutex) = &entry.program else {
debug_assert!(false);
return false;
};
let processing = mutex.lock().unwrap().0;
!processing
})?;
let (key, entry, reload) = (*key, entry.clone(), *reload);
drop(extracted);
{
let LoadedProgramType::Loading(mutex) = &entry.program else {
debug_assert!(false);
return None;
};
let processing = &mut mutex.lock().unwrap().0;
*processing = true;
}
Some((key, entry, reload))
}

/// Upon completing a task in cooperative loading a TX batch calls this to submit the result
pub fn cooperative_loading_task_complete(
&mut self,
key: Pubkey,
loading: Arc<LoadedProgram>,
loaded: Arc<LoadedProgram>,
) {
let LoadedProgramType::Loading(mutex) = &loading.program else {
debug_assert!(false);
return;
};
loaded.tx_usage_counter.store(
loading.tx_usage_counter.load(Ordering::Relaxed),
Ordering::Relaxed,
);
let mut mutex = mutex.lock().unwrap();
let processing = &mut mutex.0;
*processing = false;
let waiting_list_is_empty = {
let fork_graph = self
.fork_graph
.as_ref()
.expect("Program cache doesn't have fork graph.");
let fork_graph = fork_graph
.read()
.expect("Failed to lock fork graph for reading.");
let waiting_list = &mut mutex.1;
waiting_list.retain(|waiting| {
// The Mutex around `waiting` is strictly speaking unnecessary
// because the entire `LoadedPrograms` cache is already locked here.
let mut waiting = waiting.lock().unwrap();
let relation = fork_graph.relationship(loaded.deployment_slot, waiting.loaded.slot);
if loaded.deployment_slot <= self.latest_root_slot
|| matches!(relation, BlockRelation::Equal | BlockRelation::Descendant)
{
waiting.missing.remove(&key);
waiting.loaded.assign_program(key, loaded.clone());
return false;
}
true
});
waiting_list.is_empty()
};
if waiting_list_is_empty {
self.remove_program(key, &loading);
}
self.assign_program(key, loaded);
}

pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) {
tx_batch_cache.entries.iter().for_each(|(key, entry)| {
self.assign_program(*key, entry.clone());
Expand Down
67 changes: 34 additions & 33 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ use {
compute_budget_processor::process_compute_budget_instructions,
invoke_context::BuiltinFunctionWithContext,
loaded_programs::{
ExtractedPrograms, LoadProgramMetrics, LoadedProgram, LoadedProgramMatchCriteria,
LoadedProgramType, LoadedPrograms, LoadedProgramsForTxBatch, ProgramRuntimeEnvironment,
LoadProgramMetrics, LoadedProgram, LoadedProgramMatchCriteria, LoadedProgramType,
LoadedPrograms, LoadedProgramsForTxBatch, ProgramRuntimeEnvironment,
ProgramRuntimeEnvironments, WorkingSlot, DELAY_VISIBILITY_SLOT_OFFSET,
},
log_collector::LogCollector,
Expand Down Expand Up @@ -5068,42 +5068,43 @@ impl Bank {
.collect()
};

let ExtractedPrograms {
loaded: mut loaded_programs_for_txs,
missing,
} = {
let extracted = {
// Lock the global cache to figure out which programs need to be loaded
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
Mutex::into_inner(
Arc::into_inner(
loaded_programs_cache.extract(self, programs_and_slots.into_iter()),
)
.unwrap(),
)
.unwrap()
loaded_programs_cache.extract(self, programs_and_slots.into_iter())
};

// Load missing programs while global cache is unlocked
let missing_programs: Vec<(Pubkey, Arc<LoadedProgram>)> = missing
.iter()
.map(|(key, (entry, reloading))| {
let program = self.load_program(key, *reloading, None);
program.tx_usage_counter.store(
entry.tx_usage_counter.load(Ordering::Relaxed),
Ordering::Relaxed,
);
(*key, program)
})
.collect();

// Lock the global cache again to replenish the missing programs
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
for (key, program) in missing_programs {
let entry = loaded_programs_cache.assign_program(key, program);
// Use the returned entry as that might have been deduplicated globally
loaded_programs_for_txs.assign_program(key, entry);
// Cooperative loading phase
let mut finished_task = None;
loop {
// Critical section for global coordination
let (key, loading, reload) = {
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
if let Some((key, loading, loaded)) = finished_task.take() {
loaded_programs_cache.cooperative_loading_task_complete(key, loading, loaded);
}
if Arc::strong_count(&extracted) == 1 {
// All the missing entries for this batch have been loaded
break;
}
if let Some(task) = loaded_programs_cache.next_cooperative_loading_task(&extracted)
{
task
} else {
// Waiting for some other TX batch to complete loading the programs needed by this TX batch
// TODO: Use a Condvar here
continue;
}
};
// Load, verify and compile the program outside of the critical section
let loaded = self.load_program(&key, reload, None);
finished_task = Some((key, loading, loaded));
}
loaded_programs_for_txs

// When we get here we should be the only remaining owner
std::sync::Mutex::into_inner(Arc::into_inner(extracted).unwrap())
.unwrap()
.loaded
}

/// Returns a hash map of executable program accounts (program accounts that are not writable
Expand Down

0 comments on commit edf1dcb

Please sign in to comment.