Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos committed Jul 18, 2024
1 parent fee3974 commit 54ee359
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 25 deletions.
8 changes: 5 additions & 3 deletions mempool/src/shared_mempool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,9 +555,11 @@ pub(crate) fn process_committed_transactions(
let mut pool = mempool.lock();
let block_timestamp = Duration::from_micros(block_timestamp_usecs);

let tracking_usecases = use_case_history
.lock()
.update_usecases_and_get_tracking_set(&transactions);
let tracking_usecases = {
let mut history = use_case_history.lock();
history.update_usecases(&transactions);
history.compute_tracking_set()
};

for transaction in transactions {
pool.log_commit_transaction(
Expand Down
158 changes: 137 additions & 21 deletions mempool/src/shared_mempool/use_case_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@

use aptos_mempool_notifications::CommittedTransaction;
use aptos_types::transaction::use_case::UseCaseKey;
use std::collections::{HashMap, HashSet, VecDeque};
use std::{
cmp::Ordering,
collections::{BinaryHeap, HashMap, HashSet, VecDeque},
};

pub(crate) struct UseCaseHistory {
window_size: usize,
num_top_to_track: usize,
recent: VecDeque<HashMap<UseCaseKey, usize>>,
total: HashMap<UseCaseKey, usize>,
}

impl UseCaseHistory {
Expand All @@ -17,47 +21,159 @@ impl UseCaseHistory {
window_size,
num_top_to_track,
recent: VecDeque::with_capacity(window_size + 1),
total: HashMap::new(),
}
}

pub(crate) fn update_usecases_and_get_tracking_set(
&mut self,
transactions: &[CommittedTransaction],
) -> HashSet<UseCaseKey> {
let mut count_by_usecase = HashMap::new();
for transaction in transactions {
*count_by_usecase
.entry(transaction.use_case.clone())
.or_insert(0) += 1;
fn add_to_recent(&mut self, count_by_usecase: HashMap<UseCaseKey, usize>) {
for (use_case, count) in &count_by_usecase {
assert!(*count > 0);
*self.total.entry(use_case.clone()).or_insert(0) += *count;
}

self.recent.push_back(count_by_usecase);

while self.recent.len() > self.window_size {
self.recent.pop_front();
}
let to_remove = self.recent.pop_front().expect("non-empty after size check");
for (use_case, count) in to_remove {
assert!(count > 0);

let mut total = HashMap::new();
for group in &self.recent {
for (use_case, count) in group {
if use_case != &UseCaseKey::Platform && use_case != &UseCaseKey::Others {
*total.entry(use_case.clone()).or_insert(0) += count;
use std::collections::hash_map::Entry;
match self.total.entry(use_case) {
Entry::Occupied(mut o) => {
assert!(*o.get() >= count);
*o.get_mut() -= count;
if *o.get() == 0 {
o.remove_entry();
}
},
Entry::Vacant(e) => {
panic!("Entry present in recent cannot be missing in total {:?}", e)
},
}
}
}
}

pub fn compute_tracking_set(&self) -> HashSet<UseCaseKey> {
let mut result = HashSet::new();
result.insert(UseCaseKey::Platform);
result.insert(UseCaseKey::Others);

let mut sorted = total.into_iter().collect::<Vec<_>>();
sorted.sort_by_key(|(_, count)| *count);
let mut max_heap: BinaryHeap<UseCaseByCount> = self
.total
.iter()
.filter(|(use_case, _)| {
*use_case != &UseCaseKey::Platform && *use_case != &UseCaseKey::Others
})
.map(|(use_case, count)| UseCaseByCount {
use_case: use_case.clone(),
count: *count,
})
.collect::<BinaryHeap<_>>();

for _ in 0..self.num_top_to_track {
if let Some((use_case, _)) = sorted.pop() {
if let Some(UseCaseByCount { use_case, .. }) = max_heap.pop() {
result.insert(use_case);
}
}

result
}

pub(crate) fn update_usecases(&mut self, transactions: &[CommittedTransaction]) {
let mut count_by_usecase = HashMap::new();
for transaction in transactions {
*count_by_usecase
.entry(transaction.use_case.clone())
.or_insert(0) += 1;
}

self.add_to_recent(count_by_usecase);
}
}

#[derive(Eq, PartialEq)]
struct UseCaseByCount {
use_case: UseCaseKey,
count: usize,
}

impl Ord for UseCaseByCount {
fn cmp(&self, other: &Self) -> Ordering {
self.count.cmp(&other.count)
}
}

impl PartialOrd for UseCaseByCount {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

#[test]
fn test_use_case_history() {
use aptos_types::account_address::AccountAddress;
let ct = |use_case: UseCaseKey| -> CommittedTransaction {
CommittedTransaction {
sender: AccountAddress::ONE,
sequence_number: 0,
use_case,
}
};

let usecase1 = ContractAddress(AccountAddress::random());
let usecase2 = ContractAddress(AccountAddress::random());
let usecase3 = ContractAddress(AccountAddress::random());

use UseCaseKey::*;

let mut history = UseCaseHistory::new(2, 2);

history.update_usecases(&[ct(Platform)]);
assert_eq!(
history.compute_tracking_set(),
HashSet::from([Platform, Others])
);

history.update_usecases(&[
ct(Platform),
ct(Platform),
ct(usecase1.clone()),
ct(usecase1.clone()),
ct(usecase2.clone()),
]);
assert_eq!(
history.compute_tracking_set(),
HashSet::from([Platform, Others, usecase1.clone(), usecase2.clone()]),
);

history.update_usecases(&[
ct(usecase1.clone()),
ct(usecase2.clone()),
ct(usecase3.clone()),
]);
assert_eq!(
history.compute_tracking_set(),
HashSet::from([Platform, Others, usecase1.clone(), usecase2.clone()]),
);

history.update_usecases(&[ct(usecase2.clone()), ct(usecase3.clone())]);
assert_eq!(
history.compute_tracking_set(),
HashSet::from([Platform, Others, usecase2.clone(), usecase3.clone()]),
);

history.update_usecases(&[
ct(usecase2.clone()),
ct(usecase3.clone()),
ct(ContractAddress(AccountAddress::random())),
ct(ContractAddress(AccountAddress::random())),
ct(ContractAddress(AccountAddress::random())),
ct(ContractAddress(AccountAddress::random())),
ct(ContractAddress(AccountAddress::random())),
]);
assert_eq!(
history.compute_tracking_set(),
HashSet::from([Platform, Others, usecase2.clone(), usecase3.clone()]),
);
}
1 change: 0 additions & 1 deletion types/src/transaction/use_case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ impl UseCaseAwareTransaction for SignedTransaction {
if module_id.address().is_special() {
Platform
} else {
// n.b. Generics ignored.
ContractAddress(*module_id.address())
}
},
Expand Down

0 comments on commit 54ee359

Please sign in to comment.