From 3f42d8a58498c0cdcd2c0800432d1bf5da5e7b1f Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 16 Feb 2024 11:56:38 +0100 Subject: [PATCH] v1.18: Refactor - `LoadedPrograms::replenish()` (backport of #35145) (#35197) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactor - `LoadedPrograms::replenish()` (#35145) * Replace LoadedPrograms::replenish() with LoadedPrograms::assign_program(). * Removes LoadedPrograms::replenish(). * Defines replacement by having the same loaded program type. * Implements a proper insertion sort with a two key comparison operator. (cherry picked from commit 716ad5441bb35151c934c5f131608b6767ff6e3a) Co-authored-by: Alexander Meißner --- program-runtime/src/loaded_programs.rs | 252 +++++++++++++------------ runtime/src/bank.rs | 4 +- 2 files changed, 130 insertions(+), 126 deletions(-) diff --git a/program-runtime/src/loaded_programs.rs b/program-runtime/src/loaded_programs.rs index 66a97552d7e01a..b676830e5bc4cc 100644 --- a/program-runtime/src/loaded_programs.rs +++ b/program-runtime/src/loaded_programs.rs @@ -658,24 +658,20 @@ impl LoadedPrograms { &self.environments } - /// Refill the cache with a single entry. It's typically called during transaction loading, + /// Insert a single entry. It's typically called during transaction loading, /// when the cache doesn't contain the entry corresponding to program `key`. - /// The function dedupes the cache, in case some other thread replenished the entry in parallel. - pub fn replenish( - &mut self, - key: Pubkey, - entry: Arc, - ) -> (bool, Arc) { + pub fn assign_program(&mut self, key: Pubkey, entry: Arc) -> bool { let slot_versions = &mut self.entries.entry(key).or_default().slot_versions; - let index = slot_versions - .iter() - .position(|at| at.effective_slot >= entry.effective_slot); - if let Some(existing) = index.and_then(|index| slot_versions.get_mut(index)) { - if existing.deployment_slot == entry.deployment_slot - && existing.effective_slot == entry.effective_slot - { - if matches!(existing.program, LoadedProgramType::Unloaded(_)) { - // The unloaded program is getting reloaded + match slot_versions.binary_search_by(|at| { + at.effective_slot + .cmp(&entry.effective_slot) + .then(at.deployment_slot.cmp(&entry.deployment_slot)) + }) { + Ok(index) => { + let existing = slot_versions.get_mut(index).unwrap(); + if std::mem::discriminant(&existing.program) + != std::mem::discriminant(&entry.program) + { // Copy over the usage counter to the new entry entry.tx_usage_counter.fetch_add( existing.tx_usage_counter.load(Ordering::Relaxed), @@ -685,34 +681,21 @@ impl LoadedPrograms { existing.ix_usage_counter.load(Ordering::Relaxed), Ordering::Relaxed, ); + *existing = entry.clone(); self.stats.reloads.fetch_add(1, Ordering::Relaxed); - } else if existing.is_tombstone() != entry.is_tombstone() { - // Either the old entry is tombstone and the new one is not. - // (Let's give the new entry a chance). - // Or, the old entry is not a tombstone and the new one is a tombstone. - // (Remove the old entry, as the tombstone makes it obsolete). - self.stats.insertions.fetch_add(1, Ordering::Relaxed); + false } else { + // Something is wrong, I can feel it ... self.stats.replacements.fetch_add(1, Ordering::Relaxed); - return (true, existing.clone()); + true } - *existing = entry.clone(); - return (false, entry); + } + Err(index) => { + self.stats.insertions.fetch_add(1, Ordering::Relaxed); + slot_versions.insert(index, entry.clone()); + false } } - self.stats.insertions.fetch_add(1, Ordering::Relaxed); - slot_versions.insert(index.unwrap_or(slot_versions.len()), entry.clone()); - (false, entry) - } - - /// Assign the program `entry` to the given `key` in the cache. - /// This is typically called when a deployed program is managed (un-/re-/deployed) via - /// loader instructions. Because of the cooldown, entires can not have the same - /// deployment_slot and effective_slot. - pub fn assign_program(&mut self, key: Pubkey, entry: Arc) -> Arc { - let (was_occupied, entry) = self.replenish(key, entry); - debug_assert!(!was_occupied); - entry } pub fn prune_by_deployment_slot(&mut self, slot: Slot) { @@ -978,7 +961,7 @@ impl LoadedPrograms { pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) { tx_batch_cache.entries.iter().for_each(|(key, entry)| { - self.replenish(*key, entry.clone()); + self.assign_program(*key, entry.clone()); }) } @@ -1241,7 +1224,9 @@ mod tests { slot: Slot, reason: LoadedProgramType, ) -> Arc { - cache.assign_program(key, Arc::new(LoadedProgram::new_tombstone(slot, reason))) + let program = Arc::new(LoadedProgram::new_tombstone(slot, reason)); + cache.assign_program(key, program.clone()); + program } fn insert_unloaded_program( @@ -1265,7 +1250,8 @@ mod tests { .to_unloaded() .expect("Failed to unload the program"), ); - cache.replenish(key, unloaded).1 + cache.assign_program(key, unloaded.clone()); + unloaded } fn num_matching_entries(cache: &LoadedPrograms, predicate: P) -> usize @@ -1332,7 +1318,7 @@ mod tests { .enumerate() .for_each(|(i, deployment_slot)| { let usage_counter = *program1_usage_counters.get(i).unwrap_or(&0); - cache.replenish( + cache.assign_program( program1, new_test_loaded_program_with_usage( *deployment_slot, @@ -1365,7 +1351,7 @@ mod tests { .enumerate() .for_each(|(i, deployment_slot)| { let usage_counter = *program2_usage_counters.get(i).unwrap_or(&0); - cache.replenish( + cache.assign_program( program2, new_test_loaded_program_with_usage( *deployment_slot, @@ -1397,7 +1383,7 @@ mod tests { .enumerate() .for_each(|(i, deployment_slot)| { let usage_counter = *program3_usage_counters.get(i).unwrap_or(&0); - cache.replenish( + cache.assign_program( program3, new_test_loaded_program_with_usage( *deployment_slot, @@ -1479,7 +1465,7 @@ mod tests { .enumerate() .for_each(|(i, deployment_slot)| { let usage_counter = *program1_usage_counters.get(i).unwrap_or(&0); - cache.replenish( + cache.assign_program( program1, new_test_loaded_program_with_usage( *deployment_slot, @@ -1512,7 +1498,7 @@ mod tests { .enumerate() .for_each(|(i, deployment_slot)| { let usage_counter = *program2_usage_counters.get(i).unwrap_or(&0); - cache.replenish( + cache.assign_program( program2, new_test_loaded_program_with_usage( *deployment_slot, @@ -1544,7 +1530,7 @@ mod tests { .enumerate() .for_each(|(i, deployment_slot)| { let usage_counter = *program3_usage_counters.get(i).unwrap_or(&0); - cache.replenish( + cache.assign_program( program3, new_test_loaded_program_with_usage( *deployment_slot, @@ -1637,7 +1623,7 @@ mod tests { let program = Pubkey::new_unique(); let num_total_programs = 6; (0..num_total_programs).for_each(|i| { - cache.replenish( + cache.assign_program( program, new_test_loaded_program_with_usage(i, i + 2, AtomicU64::new(i + 10)), ); @@ -1664,7 +1650,7 @@ mod tests { // Replenish the program that was just unloaded. Use 0 as the usage counter. This should be // updated with the usage counter from the unloaded program. - cache.replenish( + cache.assign_program( program, new_test_loaded_program_with_usage(0, 2, AtomicU64::new(0)), ); @@ -1683,21 +1669,63 @@ mod tests { } #[test] - fn test_replace_tombstones() { + fn test_fuzz_assign_program_order() { + use rand::prelude::SliceRandom; + const EXPECTED_ENTRIES: [(u64, u64); 7] = + [(1, 2), (5, 5), (5, 6), (5, 10), (9, 10), (10, 10), (3, 12)]; + let mut rng = rand::thread_rng(); + let program_id = Pubkey::new_unique(); + for _ in 0..1000 { + let mut entries = EXPECTED_ENTRIES.to_vec(); + entries.shuffle(&mut rng); + let mut cache = new_mock_cache::(); + for (deployment_slot, effective_slot) in entries { + assert!(!cache.assign_program( + program_id, + new_test_loaded_program(deployment_slot, effective_slot) + )); + } + for ((deployment_slot, effective_slot), entry) in EXPECTED_ENTRIES + .iter() + .zip(cache.entries.get(&program_id).unwrap().slot_versions.iter()) + { + assert_eq!(entry.deployment_slot, *deployment_slot); + assert_eq!(entry.effective_slot, *effective_slot); + } + } + } + + #[test] + fn test_assign_program_tombstones() { let mut cache = new_mock_cache::(); let program1 = Pubkey::new_unique(); - let env = Arc::new(BuiltinProgram::new_mock()); + let env = cache.environments.program_runtime_v1.clone(); + set_tombstone( &mut cache, program1, 10, - LoadedProgramType::FailedVerification(env), + LoadedProgramType::FailedVerification(env.clone()), ); + assert_eq!(cache.entries.get(&program1).unwrap().slot_versions.len(), 1); + set_tombstone(&mut cache, program1, 10, LoadedProgramType::Closed); + assert_eq!(cache.entries.get(&program1).unwrap().slot_versions.len(), 1); + set_tombstone( + &mut cache, + program1, + 10, + LoadedProgramType::FailedVerification(env.clone()), + ); + assert_eq!(cache.entries.get(&program1).unwrap().slot_versions.len(), 1); - let loaded_program = new_test_loaded_program(10, 10); - let (existing, program) = cache.replenish(program1, loaded_program.clone()); - assert!(!existing); - assert_eq!(program, loaded_program); + // Fail on exact replacement + assert!(cache.assign_program( + program1, + Arc::new(LoadedProgram::new_tombstone( + 10, + LoadedProgramType::FailedVerification(env) + )) + )); } #[test] @@ -1735,11 +1763,7 @@ mod tests { // Add a program at slot 50, and a tombstone for the program at slot 60 let program2 = Pubkey::new_unique(); - assert!( - !cache - .replenish(program2, new_test_builtin_program(50, 51)) - .0 - ); + cache.assign_program(program2, new_test_builtin_program(50, 51)); let second_level = &cache .entries .get(&program2) @@ -1839,10 +1863,7 @@ mod tests { cache.set_fork_graph(fork_graph); let program1 = Pubkey::new_unique(); - let loaded_program = new_test_loaded_program(10, 10); - let (existing, program) = cache.replenish(program1, loaded_program.clone()); - assert!(!existing); - assert_eq!(program, loaded_program); + cache.assign_program(program1, new_test_loaded_program(10, 10)); let new_env = Arc::new(BuiltinProgram::new_mock()); cache.upcoming_environments = Some(ProgramRuntimeEnvironments { @@ -1859,9 +1880,7 @@ mod tests { ix_usage_counter: AtomicU64::default(), latest_access_slot: AtomicU64::default(), }); - let (existing, program) = cache.replenish(program1, updated_program.clone()); - assert!(!existing); - assert_eq!(program, updated_program); + cache.assign_program(program1, updated_program.clone()); // Test that there are 2 entries for the program assert_eq!( @@ -1996,38 +2015,27 @@ mod tests { cache.set_fork_graph(fork_graph); let program1 = Pubkey::new_unique(); - assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0); - assert!(!cache.replenish(program1, new_test_loaded_program(10, 11)).0); - assert!(!cache.replenish(program1, new_test_loaded_program(20, 21)).0); - - // Test: inserting duplicate entry return pre existing entry from the cache - assert!(cache.replenish(program1, new_test_loaded_program(20, 21)).0); + cache.assign_program(program1, new_test_loaded_program(0, 1)); + cache.assign_program(program1, new_test_loaded_program(10, 11)); + cache.assign_program(program1, new_test_loaded_program(20, 21)); let program2 = Pubkey::new_unique(); - assert!(!cache.replenish(program2, new_test_loaded_program(5, 6)).0); - assert!( - !cache - .replenish( - program2, - new_test_loaded_program(11, 11 + DELAY_VISIBILITY_SLOT_OFFSET) - ) - .0 + cache.assign_program(program2, new_test_loaded_program(5, 6)); + cache.assign_program( + program2, + new_test_loaded_program(11, 11 + DELAY_VISIBILITY_SLOT_OFFSET), ); let program3 = Pubkey::new_unique(); - assert!(!cache.replenish(program3, new_test_loaded_program(25, 26)).0); + cache.assign_program(program3, new_test_loaded_program(25, 26)); let program4 = Pubkey::new_unique(); - assert!(!cache.replenish(program4, new_test_loaded_program(0, 1)).0); - assert!(!cache.replenish(program4, new_test_loaded_program(5, 6)).0); + cache.assign_program(program4, new_test_loaded_program(0, 1)); + cache.assign_program(program4, new_test_loaded_program(5, 6)); // The following is a special case, where effective slot is 3 slots in the future - assert!( - !cache - .replenish( - program4, - new_test_loaded_program(15, 15 + DELAY_VISIBILITY_SLOT_OFFSET) - ) - .0 + cache.assign_program( + program4, + new_test_loaded_program(15, 15 + DELAY_VISIBILITY_SLOT_OFFSET), ); // Current fork graph @@ -2152,7 +2160,7 @@ mod tests { ix_usage_counter: AtomicU64::default(), latest_access_slot: AtomicU64::default(), }); - assert!(!cache.replenish(program4, test_program).0); + assert!(!cache.assign_program(program4, test_program)); // Testing fork 0 - 5 - 11 - 15 - 16 - 19 - 21 - 23 with current slot at 19 let mut missing = vec![ @@ -2305,15 +2313,15 @@ mod tests { cache.set_fork_graph(fork_graph); let program1 = Pubkey::new_unique(); - assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0); - assert!(!cache.replenish(program1, new_test_loaded_program(20, 21)).0); + cache.assign_program(program1, new_test_loaded_program(0, 1)); + cache.assign_program(program1, new_test_loaded_program(20, 21)); let program2 = Pubkey::new_unique(); - assert!(!cache.replenish(program2, new_test_loaded_program(5, 6)).0); - assert!(!cache.replenish(program2, new_test_loaded_program(11, 12)).0); + cache.assign_program(program2, new_test_loaded_program(5, 6)); + cache.assign_program(program2, new_test_loaded_program(11, 12)); let program3 = Pubkey::new_unique(); - assert!(!cache.replenish(program3, new_test_loaded_program(25, 26)).0); + cache.assign_program(program3, new_test_loaded_program(25, 26)); // Testing fork 0 - 5 - 11 - 15 - 16 - 19 - 21 - 23 with current slot at 19 let mut missing = vec![ @@ -2378,12 +2386,12 @@ mod tests { cache.set_fork_graph(fork_graph); let program1 = Pubkey::new_unique(); - assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0); - assert!(!cache.replenish(program1, new_test_loaded_program(20, 21)).0); + cache.assign_program(program1, new_test_loaded_program(0, 1)); + cache.assign_program(program1, new_test_loaded_program(20, 21)); let program2 = Pubkey::new_unique(); - assert!(!cache.replenish(program2, new_test_loaded_program(5, 6)).0); - assert!(!cache.replenish(program2, new_test_loaded_program(11, 12)).0); + cache.assign_program(program2, new_test_loaded_program(5, 6)); + cache.assign_program(program2, new_test_loaded_program(11, 12)); let program3 = Pubkey::new_unique(); // Insert an unloaded program with correct/cache's environment at slot 25 @@ -2392,17 +2400,13 @@ mod tests { // Insert another unloaded program with a different environment at slot 20 // Since this entry's environment won't match cache's environment, looking up this // entry should return missing instead of unloaded entry. - assert!( - !cache - .replenish( - program3, - Arc::new( - new_test_loaded_program(20, 21) - .to_unloaded() - .expect("Failed to create unloaded program") - ) - ) - .0 + cache.assign_program( + program3, + Arc::new( + new_test_loaded_program(20, 21) + .to_unloaded() + .expect("Failed to create unloaded program"), + ), ); // Testing fork 0 - 5 - 11 - 15 - 16 - 19 - 21 - 23 with current slot at 19 @@ -2475,15 +2479,15 @@ mod tests { cache.set_fork_graph(fork_graph); let program1 = Pubkey::new_unique(); - assert!(!cache.replenish(program1, new_test_loaded_program(10, 11)).0); - assert!(!cache.replenish(program1, new_test_loaded_program(20, 21)).0); + assert!(!cache.assign_program(program1, new_test_loaded_program(10, 11))); + assert!(!cache.assign_program(program1, new_test_loaded_program(20, 21))); let program2 = Pubkey::new_unique(); - assert!(!cache.replenish(program2, new_test_loaded_program(5, 6)).0); - assert!(!cache.replenish(program2, new_test_loaded_program(11, 12)).0); + assert!(!cache.assign_program(program2, new_test_loaded_program(5, 6))); + assert!(!cache.assign_program(program2, new_test_loaded_program(11, 12))); let program3 = Pubkey::new_unique(); - assert!(!cache.replenish(program3, new_test_loaded_program(25, 26)).0); + assert!(!cache.assign_program(program3, new_test_loaded_program(25, 26))); // The following is a special case, where there's an expiration slot let test_program = Arc::new(LoadedProgram { @@ -2496,7 +2500,7 @@ mod tests { ix_usage_counter: AtomicU64::default(), latest_access_slot: AtomicU64::default(), }); - assert!(!cache.replenish(program1, test_program).0); + assert!(!cache.assign_program(program1, test_program)); // Testing fork 0 - 5 - 11 - 15 - 16 - 19 - 21 - 23 with current slot at 19 let mut missing = vec![ @@ -2580,8 +2584,8 @@ mod tests { cache.set_fork_graph(fork_graph); let program1 = Pubkey::new_unique(); - assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0); - assert!(!cache.replenish(program1, new_test_loaded_program(5, 6)).0); + cache.assign_program(program1, new_test_loaded_program(0, 1)); + cache.assign_program(program1, new_test_loaded_program(5, 6)); cache.prune(10, 0); @@ -2620,11 +2624,11 @@ mod tests { cache.set_fork_graph(fork_graph); let program1 = Pubkey::new_unique(); - assert!(!cache.replenish(program1, new_test_loaded_program(0, 1)).0); - assert!(!cache.replenish(program1, new_test_loaded_program(5, 6)).0); + cache.assign_program(program1, new_test_loaded_program(0, 1)); + cache.assign_program(program1, new_test_loaded_program(5, 6)); let program2 = Pubkey::new_unique(); - assert!(!cache.replenish(program2, new_test_loaded_program(10, 11)).0); + cache.assign_program(program2, new_test_loaded_program(10, 11)); let mut missing = vec![ (program1, (LoadedProgramMatchCriteria::NoCriteria, 1)), diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index c96a109a14aa75..1a4dbaca38c688 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1377,7 +1377,7 @@ impl Bank { drop(loaded_programs_cache); let recompiled = new.load_program(&key, false, Some(program_to_recompile)); let mut loaded_programs_cache = new.loaded_programs_cache.write().unwrap(); - loaded_programs_cache.replenish(key, recompiled); + loaded_programs_cache.assign_program(key, recompiled); } } else if new.epoch() != loaded_programs_cache.latest_root_epoch || slot_index.saturating_add(slots_in_recompilation_phase) >= slots_in_epoch @@ -7687,7 +7687,7 @@ impl Bank { self.loaded_programs_cache .write() .unwrap() - .replenish(program_id, Arc::new(builtin)); + .assign_program(program_id, Arc::new(builtin)); debug!("Added program {} under {:?}", name, program_id); }