Skip to content

Commit

Permalink
Find and load missing programs in LoadedPrograms cache (solana-labs#3…
Browse files Browse the repository at this point in the history
…0275)

* Find and load missing programs in LoadedPrograms cache

- filter program accounts in a transaction batch
- filter the accounts that are missing in LoadedPrograms cache
- load the programs before processing the transactions
- unit tests

* address review comments

* fix clippy

* address review comments

* fix test

* fix more tests
  • Loading branch information
pgarg66 authored and nickfrosty committed Mar 12, 2023
1 parent fe484cf commit 60bd60c
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 33 deletions.
7 changes: 4 additions & 3 deletions program-runtime/src/executor_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ impl TransactionExecutorCache {
self.visible.get(key).cloned()
}

pub fn set_tombstone(&mut self, key: Pubkey) {
pub fn set_tombstone(&mut self, key: Pubkey, slot: Slot) {
self.visible
.insert(key, Arc::new(LoadedProgram::new_tombstone()));
.insert(key, Arc::new(LoadedProgram::new_tombstone(slot)));
}

pub fn set(
Expand All @@ -52,12 +52,13 @@ impl TransactionExecutorCache {
executor: Arc<LoadedProgram>,
upgrade: bool,
delay_visibility_of_program_deployment: bool,
current_slot: Slot,
) {
if upgrade {
if delay_visibility_of_program_deployment {
// Place a tombstone in the cache so that
// we don't load the new version from the database as it should remain invisible
self.set_tombstone(key);
self.set_tombstone(key, current_slot);
} else {
self.visible.insert(key, executor.clone());
}
Expand Down
129 changes: 107 additions & 22 deletions program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ impl LoadedProgram {
}
}

pub fn new_tombstone() -> Self {
pub fn new_tombstone(slot: Slot) -> Self {
Self {
program: LoadedProgramType::Invalid,
account_size: 0,
deployment_slot: 0,
effective_slot: 0,
deployment_slot: slot,
effective_slot: slot,
usage_counter: AtomicU64::default(),
}
}
Expand Down Expand Up @@ -219,8 +219,10 @@ pub enum LoadedProgramEntry {
}

impl LoadedPrograms {
/// Inserts a single entry
pub fn insert_entry(&mut self, key: Pubkey, entry: LoadedProgram) -> LoadedProgramEntry {
/// Refill the cache with a single entry. It's typically called during transaction processing,
/// when the cache doesn't contain the entry corresponding to program `key`.
/// The function dedupes the cache, in case some other thread replenished the the entry in parallel.
pub fn replenish(&mut self, key: Pubkey, entry: Arc<LoadedProgram>) -> LoadedProgramEntry {
let second_level = self.entries.entry(key).or_insert_with(Vec::new);
let index = second_level
.iter()
Expand All @@ -235,9 +237,32 @@ impl LoadedPrograms {
return LoadedProgramEntry::WasOccupied(existing.clone());
}
}
let new_entry = Arc::new(entry);
second_level.insert(index.unwrap_or(second_level.len()), new_entry.clone());
LoadedProgramEntry::WasVacant(new_entry)
second_level.insert(index.unwrap_or(second_level.len()), entry.clone());
LoadedProgramEntry::WasVacant(entry)
}

/// Assign the program `entry` to the given `key` in the cache.
/// This is typically called when a deployed program is managed (upgraded/un/reddeployed) via
/// bpf loader instructions.
/// The program management is not expected to overlap with initial program deployment slot.
/// Note: Do not call this function to replenish cache with a missing entry. As that use-case can
/// cause the cache to have duplicates. Use `replenish()` API for that use-case.
pub fn assign_program(&mut self, key: Pubkey, entry: Arc<LoadedProgram>) -> Arc<LoadedProgram> {
let second_level = self.entries.entry(key).or_insert_with(Vec::new);
let index = second_level
.iter()
.position(|at| at.effective_slot >= entry.effective_slot);
if let Some(index) = index {
let existing = second_level
.get(index)
.expect("Missing entry, even though position was found");
assert!(
existing.deployment_slot != entry.deployment_slot
|| existing.effective_slot != entry.effective_slot
);
}
second_level.insert(index.unwrap_or(second_level.len()), entry.clone());
entry
}

/// Before rerooting the blockstore this removes all programs of orphan forks
Expand Down Expand Up @@ -310,6 +335,7 @@ mod tests {
BlockRelation, ForkGraph, LoadedProgram, LoadedProgramEntry, LoadedProgramType,
LoadedPrograms, WorkingSlot,
},
solana_rbpf::vm::BuiltInProgram,
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
collections::HashMap,
Expand All @@ -318,11 +344,70 @@ mod tests {
},
};

fn new_test_builtin_program(deployment_slot: Slot, effective_slot: Slot) -> Arc<LoadedProgram> {
Arc::new(LoadedProgram {
program: LoadedProgramType::BuiltIn(BuiltInProgram::default()),
account_size: 0,
deployment_slot,
effective_slot,
usage_counter: AtomicU64::default(),
})
}

fn set_tombstone(cache: &mut LoadedPrograms, key: Pubkey, slot: Slot) -> Arc<LoadedProgram> {
cache.assign_program(key, Arc::new(LoadedProgram::new_tombstone(slot)))
}

#[test]
fn test_tombstone() {
let tombstone = LoadedProgram::new_tombstone();
let tombstone = LoadedProgram::new_tombstone(0);
assert!(matches!(tombstone.program, LoadedProgramType::Invalid));
assert!(tombstone.is_tombstone());
assert_eq!(tombstone.deployment_slot, 0);
assert_eq!(tombstone.effective_slot, 0);

let tombstone = LoadedProgram::new_tombstone(100);
assert!(matches!(tombstone.program, LoadedProgramType::Invalid));
assert!(tombstone.is_tombstone());
assert_eq!(tombstone.deployment_slot, 100);
assert_eq!(tombstone.effective_slot, 100);

let mut cache = LoadedPrograms::default();
let program1 = Pubkey::new_unique();
let tombstone = set_tombstone(&mut cache, program1, 10);
let second_level = &cache
.entries
.get(&program1)
.expect("Failed to find the entry");
assert_eq!(second_level.len(), 1);
assert!(second_level.get(0).unwrap().is_tombstone());
assert_eq!(tombstone.deployment_slot, 10);
assert_eq!(tombstone.effective_slot, 10);

// Add a program at slot 50, and a tombstone for the program at slot 60
let program2 = Pubkey::new_unique();
assert!(matches!(
cache.replenish(program2, new_test_builtin_program(50, 51)),
LoadedProgramEntry::WasVacant(_)
));
let second_level = &cache
.entries
.get(&program2)
.expect("Failed to find the entry");
assert_eq!(second_level.len(), 1);
assert!(!second_level.get(0).unwrap().is_tombstone());

let tombstone = set_tombstone(&mut cache, program2, 60);
let second_level = &cache
.entries
.get(&program2)
.expect("Failed to find the entry");
assert_eq!(second_level.len(), 2);
assert!(!second_level.get(0).unwrap().is_tombstone());
assert!(second_level.get(1).unwrap().is_tombstone());
assert!(tombstone.is_tombstone());
assert_eq!(tombstone.deployment_slot, 60);
assert_eq!(tombstone.effective_slot, 60);
}

struct TestForkGraph {
Expand Down Expand Up @@ -464,14 +549,14 @@ mod tests {
}
}

fn new_test_loaded_program(deployment_slot: Slot, effective_slot: Slot) -> LoadedProgram {
LoadedProgram {
fn new_test_loaded_program(deployment_slot: Slot, effective_slot: Slot) -> Arc<LoadedProgram> {
Arc::new(LoadedProgram {
program: LoadedProgramType::Invalid,
account_size: 0,
deployment_slot,
effective_slot,
usage_counter: AtomicU64::default(),
}
})
}

fn match_slot(
Expand Down Expand Up @@ -511,52 +596,52 @@ mod tests {

let program1 = Pubkey::new_unique();
assert!(matches!(
cache.insert_entry(program1, new_test_loaded_program(0, 1)),
cache.replenish(program1, new_test_loaded_program(0, 1)),
LoadedProgramEntry::WasVacant(_)
));
assert!(matches!(
cache.insert_entry(program1, new_test_loaded_program(10, 11)),
cache.replenish(program1, new_test_loaded_program(10, 11)),
LoadedProgramEntry::WasVacant(_)
));
assert!(matches!(
cache.insert_entry(program1, new_test_loaded_program(20, 21)),
cache.replenish(program1, new_test_loaded_program(20, 21)),
LoadedProgramEntry::WasVacant(_)
));

// Test: inserting duplicate entry return pre existing entry from the cache
assert!(matches!(
cache.insert_entry(program1, new_test_loaded_program(20, 21)),
cache.replenish(program1, new_test_loaded_program(20, 21)),
LoadedProgramEntry::WasOccupied(_)
));

let program2 = Pubkey::new_unique();
assert!(matches!(
cache.insert_entry(program2, new_test_loaded_program(5, 6)),
cache.replenish(program2, new_test_loaded_program(5, 6)),
LoadedProgramEntry::WasVacant(_)
));
assert!(matches!(
cache.insert_entry(program2, new_test_loaded_program(11, 12)),
cache.replenish(program2, new_test_loaded_program(11, 12)),
LoadedProgramEntry::WasVacant(_)
));

let program3 = Pubkey::new_unique();
assert!(matches!(
cache.insert_entry(program3, new_test_loaded_program(25, 26)),
cache.replenish(program3, new_test_loaded_program(25, 26)),
LoadedProgramEntry::WasVacant(_)
));

let program4 = Pubkey::new_unique();
assert!(matches!(
cache.insert_entry(program4, new_test_loaded_program(0, 1)),
cache.replenish(program4, new_test_loaded_program(0, 1)),
LoadedProgramEntry::WasVacant(_)
));
assert!(matches!(
cache.insert_entry(program4, new_test_loaded_program(5, 6)),
cache.replenish(program4, new_test_loaded_program(5, 6)),
LoadedProgramEntry::WasVacant(_)
));
// The following is a special case, where effective slot is 4 slots in the future
assert!(matches!(
cache.insert_entry(program4, new_test_loaded_program(15, 19)),
cache.replenish(program4, new_test_loaded_program(15, 19)),
LoadedProgramEntry::WasVacant(_)
));

Expand Down
5 changes: 4 additions & 1 deletion programs/bpf_loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ pub fn load_program_from_account(
loaded_program.clone(),
false,
feature_set.is_active(&delay_visibility_of_program_deployment::id()),
deployment_slot,
);
}

Expand Down Expand Up @@ -291,6 +292,7 @@ macro_rules! deploy_program {
Arc::new(executor),
true,
delay_visibility_of_program_deployment,
$slot,
);
}};
}
Expand Down Expand Up @@ -1183,10 +1185,11 @@ fn process_loader_upgradeable_instruction(
.feature_set
.is_active(&delay_visibility_of_program_deployment::id())
{
let clock = invoke_context.get_sysvar_cache().get_clock()?;
invoke_context
.tx_executor_cache
.borrow_mut()
.set_tombstone(program_key);
.set_tombstone(program_key, clock.slot);
}
}
_ => {
Expand Down
83 changes: 82 additions & 1 deletion runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ use {
compute_budget::{self, ComputeBudget},
executor_cache::{BankExecutorCache, TransactionExecutorCache, MAX_CACHED_EXECUTORS},
invoke_context::{BuiltinProgram, ProcessInstructionWithContext},
loaded_programs::{LoadedProgram, LoadedPrograms, WorkingSlot},
loaded_programs::{LoadedProgram, LoadedProgramEntry, LoadedPrograms, WorkingSlot},
log_collector::LogCollector,
sysvar_cache::SysvarCache,
timings::{ExecuteTimingType, ExecuteTimings},
Expand All @@ -105,6 +105,7 @@ use {
AccountSharedData, InheritableAccountFields, ReadableAccount, WritableAccount,
},
account_utils::StateMut,
bpf_loader, bpf_loader_deprecated,
bpf_loader_upgradeable::{self, UpgradeableLoaderState},
clock::{
BankId, Epoch, Slot, SlotCount, SlotIndex, UnixTimestamp, DEFAULT_HASHES_PER_TICK,
Expand Down Expand Up @@ -4338,6 +4339,68 @@ impl Bank {
|| self.cluster_type() != ClusterType::MainnetBeta
}

#[allow(dead_code)] // Preparation for BankExecutorCache rework
fn load_and_get_programs_from_cache<'a>(
&self,
program_owners: &[&'a Pubkey],
sanitized_txs: &[SanitizedTransaction],
check_results: &mut [TransactionCheckResult],
) -> (
HashMap<Pubkey, &'a Pubkey>,
HashMap<Pubkey, Arc<LoadedProgram>>,
) {
let mut filter_programs_time = Measure::start("filter_programs_accounts");
let program_accounts_map = self.rc.accounts.filter_executable_program_accounts(
&self.ancestors,
sanitized_txs,
check_results,
program_owners,
&self.blockhash_queue.read().unwrap(),
);
filter_programs_time.stop();

let mut filter_missing_programs_time = Measure::start("filter_missing_programs_accounts");
let (mut loaded_programs_for_txs, missing_programs) = self
.loaded_programs_cache
.read()
.unwrap()
.extract(self, program_accounts_map.keys().cloned());
filter_missing_programs_time.stop();

missing_programs
.iter()
.for_each(|pubkey| match self.load_program(pubkey) {
Ok(program) => {
match self
.loaded_programs_cache
.write()
.unwrap()
.replenish(*pubkey, program)
{
LoadedProgramEntry::WasOccupied(entry) => {
loaded_programs_for_txs.insert(*pubkey, entry);
}
LoadedProgramEntry::WasVacant(new_entry) => {
loaded_programs_for_txs.insert(*pubkey, new_entry);
}
}
}

Err(e) => {
// Create a tombstone for the program in the cache
debug!("Failed to load program {}, error {:?}", pubkey, e);
let tombstone = self
.loaded_programs_cache
.write()
.unwrap()
.assign_program(*pubkey, Arc::new(LoadedProgram::new_tombstone(self.slot)));
loaded_programs_for_txs.insert(*pubkey, tombstone);
}
});

(program_accounts_map, loaded_programs_for_txs)
}

#[allow(clippy::type_complexity)]
pub fn load_and_execute_transactions(
&self,
Expand Down Expand Up @@ -4400,6 +4463,24 @@ impl Bank {
);
check_time.stop();

let program_owners: Vec<Pubkey> = vec![
bpf_loader_upgradeable::id(),
bpf_loader::id(),
bpf_loader_deprecated::id(),
native_loader::id(),
];

let _program_owners_refs: Vec<&Pubkey> = program_owners.iter().collect();
// The following code is currently commented out. This is how the new cache will
// finally be used, once rest of the code blocks are in place.
/*
let (program_accounts_map, loaded_programs_map) = self.load_and_get_programs_from_cache(
&program_owners_refs,
sanitized_txs,
&check_results,
);
*/

let mut load_time = Measure::start("accounts_load");
let mut loaded_transactions = self.rc.accounts.load_accounts(
&self.ancestors,
Expand Down
Loading

0 comments on commit 60bd60c

Please sign in to comment.