Skip to content

Commit

Permalink
Find and load missing programs in LoadedPrograms cache
Browse files Browse the repository at this point in the history
- filter program accounts in a transaction batch
- filter the accounts that are missing in LoadedPrograms cache
- load the programs before processing the transactions
- unit tests
  • Loading branch information
pgarg66 committed Feb 17, 2023
1 parent 3a4ba72 commit d7a80c8
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 6 deletions.
18 changes: 13 additions & 5 deletions program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,12 @@ pub enum LoadedProgramEntry {
}

impl LoadedPrograms {
/// Inserts a single entry
pub fn insert_entry(&mut self, key: Pubkey, entry: LoadedProgram) -> LoadedProgramEntry {
/// Inserts a single entry wrapped in an Arc
pub fn insert_entry_arc(
&mut self,
key: Pubkey,
entry: Arc<LoadedProgram>,
) -> LoadedProgramEntry {
let second_level = self.entries.entry(key).or_insert_with(Vec::new);
let index = second_level
.iter()
Expand All @@ -239,9 +243,13 @@ impl LoadedPrograms {
return LoadedProgramEntry::WasOccupied(existing.clone());
}
}
let new_entry = Arc::new(entry);
second_level.insert(index.unwrap_or(second_level.len()), new_entry.clone());
LoadedProgramEntry::WasVacant(new_entry)
second_level.insert(index.unwrap_or(second_level.len()), entry.clone());
LoadedProgramEntry::WasVacant(entry)
}

/// Inserts a single entry
pub fn insert_entry(&mut self, key: Pubkey, entry: LoadedProgram) -> LoadedProgramEntry {
self.insert_entry_arc(key, Arc::new(entry))
}

/// Before rerooting the blockstore this removes all programs of orphan forks
Expand Down
83 changes: 82 additions & 1 deletion runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ use {
compute_budget::{self, ComputeBudget},
executor_cache::{BankExecutorCache, TransactionExecutorCache, MAX_CACHED_EXECUTORS},
invoke_context::{BuiltinProgram, ProcessInstructionWithContext},
loaded_programs::{LoadedProgram, LoadedPrograms, WorkingSlot},
loaded_programs::{LoadedProgram, LoadedProgramEntry, LoadedPrograms, WorkingSlot},
log_collector::LogCollector,
sysvar_cache::SysvarCache,
timings::{ExecuteTimingType, ExecuteTimings},
Expand All @@ -105,6 +105,7 @@ use {
AccountSharedData, InheritableAccountFields, ReadableAccount, WritableAccount,
},
account_utils::StateMut,
bpf_loader, bpf_loader_deprecated,
bpf_loader_upgradeable::{self, UpgradeableLoaderState},
clock::{
BankId, Epoch, Slot, SlotCount, SlotIndex, UnixTimestamp, DEFAULT_HASHES_PER_TICK,
Expand Down Expand Up @@ -4338,6 +4339,68 @@ impl Bank {
|| self.cluster_type() != ClusterType::MainnetBeta
}

#[allow(dead_code)] // Preparation for BankExecutorCache rework
fn load_and_get_programs_from_cache<'a>(
&self,
program_owners: &[&'a Pubkey],
sanitized_txs: &[SanitizedTransaction],
check_results: &mut [TransactionCheckResult],
) -> (
HashMap<Pubkey, &'a Pubkey>,
HashMap<Pubkey, Arc<LoadedProgram>>,
) {
let mut filter_programs_time = Measure::start("filter_programs_accounts");
let program_accounts_map = self.rc.accounts.filter_executable_program_accounts(
&self.ancestors,
sanitized_txs,
check_results,
program_owners,
&self.blockhash_queue.read().unwrap(),
);
filter_programs_time.stop();

let mut filter_missing_programs_time = Measure::start("filter_missing_programs_accounts");
let (mut loaded_programs, missing_programs) = self
.loaded_programs_cache
.read()
.unwrap()
.extract(self, program_accounts_map.keys().cloned());
filter_missing_programs_time.stop();

missing_programs
.iter()
.for_each(|pubkey| match self.load_program(pubkey) {
Ok(program) => {
match self
.loaded_programs_cache
.write()
.unwrap()
.insert_entry_arc(*pubkey, program)
{
LoadedProgramEntry::WasOccupied(entry) => {
loaded_programs.insert(*pubkey, entry);
}
LoadedProgramEntry::WasVacant(new_entry) => {
loaded_programs.insert(*pubkey, new_entry);
}
}
}

Err(e) => {
// Create a tombstone for the program in the cache??
debug!("Failed to load program {}, error {:?}", pubkey, e);
let tombstone = Arc::new(LoadedProgram::new_tombstone());
self.loaded_programs_cache
.write()
.unwrap()
.insert_entry_arc(*pubkey, tombstone.clone());
loaded_programs.insert(*pubkey, tombstone);
}
});

(program_accounts_map, loaded_programs)
}

#[allow(clippy::type_complexity)]
pub fn load_and_execute_transactions(
&self,
Expand Down Expand Up @@ -4400,6 +4463,24 @@ impl Bank {
);
check_time.stop();

let program_owners: Vec<Pubkey> = vec![
bpf_loader_upgradeable::id(),
bpf_loader::id(),
bpf_loader_deprecated::id(),
native_loader::id(),
];

let _program_owners_refs: Vec<&Pubkey> = program_owners.iter().collect();
// The following code is currently commented out. This is how the new cache will
// finally be used, once rest of the code blocks are in place.
/*
let (program_accounts_map, loaded_programs_map) = self.load_and_get_programs_from_cache(
&program_owners_refs,
sanitized_txs,
&check_results,
);
*/

let mut load_time = Measure::start("accounts_load");
let mut loaded_transactions = self.rc.accounts.load_accounts(
&self.ancestors,
Expand Down

0 comments on commit d7a80c8

Please sign in to comment.