From 54ee359da6a2dd140435494c5515497e0ff5dfed Mon Sep 17 00:00:00 2001 From: Igor Date: Thu, 18 Jul 2024 15:01:19 -0700 Subject: [PATCH] addressing comments --- mempool/src/shared_mempool/tasks.rs | 8 +- .../src/shared_mempool/use_case_history.rs | 158 +++++++++++++++--- types/src/transaction/use_case.rs | 1 - 3 files changed, 142 insertions(+), 25 deletions(-) diff --git a/mempool/src/shared_mempool/tasks.rs b/mempool/src/shared_mempool/tasks.rs index a19287b855243..c5fc3a23a7531 100644 --- a/mempool/src/shared_mempool/tasks.rs +++ b/mempool/src/shared_mempool/tasks.rs @@ -555,9 +555,11 @@ pub(crate) fn process_committed_transactions( let mut pool = mempool.lock(); let block_timestamp = Duration::from_micros(block_timestamp_usecs); - let tracking_usecases = use_case_history - .lock() - .update_usecases_and_get_tracking_set(&transactions); + let tracking_usecases = { + let mut history = use_case_history.lock(); + history.update_usecases(&transactions); + history.compute_tracking_set() + }; for transaction in transactions { pool.log_commit_transaction( diff --git a/mempool/src/shared_mempool/use_case_history.rs b/mempool/src/shared_mempool/use_case_history.rs index 85882a5264d76..c4c3d428de237 100644 --- a/mempool/src/shared_mempool/use_case_history.rs +++ b/mempool/src/shared_mempool/use_case_history.rs @@ -3,12 +3,16 @@ use aptos_mempool_notifications::CommittedTransaction; use aptos_types::transaction::use_case::UseCaseKey; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::{ + cmp::Ordering, + collections::{BinaryHeap, HashMap, HashSet, VecDeque}, +}; pub(crate) struct UseCaseHistory { window_size: usize, num_top_to_track: usize, recent: VecDeque>, + total: HashMap, } impl UseCaseHistory { @@ -17,47 +21,159 @@ impl UseCaseHistory { window_size, num_top_to_track, recent: VecDeque::with_capacity(window_size + 1), + total: HashMap::new(), } } - pub(crate) fn update_usecases_and_get_tracking_set( - &mut self, - transactions: &[CommittedTransaction], - ) -> HashSet { - let mut count_by_usecase = HashMap::new(); - for transaction in transactions { - *count_by_usecase - .entry(transaction.use_case.clone()) - .or_insert(0) += 1; + fn add_to_recent(&mut self, count_by_usecase: HashMap) { + for (use_case, count) in &count_by_usecase { + assert!(*count > 0); + *self.total.entry(use_case.clone()).or_insert(0) += *count; } - self.recent.push_back(count_by_usecase); + while self.recent.len() > self.window_size { - self.recent.pop_front(); - } + let to_remove = self.recent.pop_front().expect("non-empty after size check"); + for (use_case, count) in to_remove { + assert!(count > 0); - let mut total = HashMap::new(); - for group in &self.recent { - for (use_case, count) in group { - if use_case != &UseCaseKey::Platform && use_case != &UseCaseKey::Others { - *total.entry(use_case.clone()).or_insert(0) += count; + use std::collections::hash_map::Entry; + match self.total.entry(use_case) { + Entry::Occupied(mut o) => { + assert!(*o.get() >= count); + *o.get_mut() -= count; + if *o.get() == 0 { + o.remove_entry(); + } + }, + Entry::Vacant(e) => { + panic!("Entry present in recent cannot be missing in total {:?}", e) + }, } } } + } + pub fn compute_tracking_set(&self) -> HashSet { let mut result = HashSet::new(); result.insert(UseCaseKey::Platform); result.insert(UseCaseKey::Others); - let mut sorted = total.into_iter().collect::>(); - sorted.sort_by_key(|(_, count)| *count); + let mut max_heap: BinaryHeap = self + .total + .iter() + .filter(|(use_case, _)| { + *use_case != &UseCaseKey::Platform && *use_case != &UseCaseKey::Others + }) + .map(|(use_case, count)| UseCaseByCount { + use_case: use_case.clone(), + count: *count, + }) + .collect::>(); for _ in 0..self.num_top_to_track { - if let Some((use_case, _)) = sorted.pop() { + if let Some(UseCaseByCount { use_case, .. }) = max_heap.pop() { result.insert(use_case); } } result } + + pub(crate) fn update_usecases(&mut self, transactions: &[CommittedTransaction]) { + let mut count_by_usecase = HashMap::new(); + for transaction in transactions { + *count_by_usecase + .entry(transaction.use_case.clone()) + .or_insert(0) += 1; + } + + self.add_to_recent(count_by_usecase); + } +} + +#[derive(Eq, PartialEq)] +struct UseCaseByCount { + use_case: UseCaseKey, + count: usize, +} + +impl Ord for UseCaseByCount { + fn cmp(&self, other: &Self) -> Ordering { + self.count.cmp(&other.count) + } +} + +impl PartialOrd for UseCaseByCount { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +#[test] +fn test_use_case_history() { + use aptos_types::account_address::AccountAddress; + let ct = |use_case: UseCaseKey| -> CommittedTransaction { + CommittedTransaction { + sender: AccountAddress::ONE, + sequence_number: 0, + use_case, + } + }; + + let usecase1 = ContractAddress(AccountAddress::random()); + let usecase2 = ContractAddress(AccountAddress::random()); + let usecase3 = ContractAddress(AccountAddress::random()); + + use UseCaseKey::*; + + let mut history = UseCaseHistory::new(2, 2); + + history.update_usecases(&[ct(Platform)]); + assert_eq!( + history.compute_tracking_set(), + HashSet::from([Platform, Others]) + ); + + history.update_usecases(&[ + ct(Platform), + ct(Platform), + ct(usecase1.clone()), + ct(usecase1.clone()), + ct(usecase2.clone()), + ]); + assert_eq!( + history.compute_tracking_set(), + HashSet::from([Platform, Others, usecase1.clone(), usecase2.clone()]), + ); + + history.update_usecases(&[ + ct(usecase1.clone()), + ct(usecase2.clone()), + ct(usecase3.clone()), + ]); + assert_eq!( + history.compute_tracking_set(), + HashSet::from([Platform, Others, usecase1.clone(), usecase2.clone()]), + ); + + history.update_usecases(&[ct(usecase2.clone()), ct(usecase3.clone())]); + assert_eq!( + history.compute_tracking_set(), + HashSet::from([Platform, Others, usecase2.clone(), usecase3.clone()]), + ); + + history.update_usecases(&[ + ct(usecase2.clone()), + ct(usecase3.clone()), + ct(ContractAddress(AccountAddress::random())), + ct(ContractAddress(AccountAddress::random())), + ct(ContractAddress(AccountAddress::random())), + ct(ContractAddress(AccountAddress::random())), + ct(ContractAddress(AccountAddress::random())), + ]); + assert_eq!( + history.compute_tracking_set(), + HashSet::from([Platform, Others, usecase2.clone(), usecase3.clone()]), + ); } diff --git a/types/src/transaction/use_case.rs b/types/src/transaction/use_case.rs index 110f8490f996e..ee72a61b5d964 100644 --- a/types/src/transaction/use_case.rs +++ b/types/src/transaction/use_case.rs @@ -46,7 +46,6 @@ impl UseCaseAwareTransaction for SignedTransaction { if module_id.address().is_special() { Platform } else { - // n.b. Generics ignored. ContractAddress(*module_id.address()) } },