diff --git a/Cargo.lock b/Cargo.lock index 4eb704676922b..2792dcb5c6433 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -881,6 +881,7 @@ dependencies = [ "claims", "clap 4.4.14", "dashmap", + "derivative", "enum_dispatch", "fail", "futures", @@ -898,6 +899,7 @@ dependencies = [ "once_cell", "ordered-float 3.9.2", "proptest", + "proptest-derive", "rand 0.7.3", "rayon", "scopeguard", @@ -6685,12 +6687,12 @@ dependencies = [ [[package]] name = "darling" -version = "0.20.3" +version = "0.20.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" +checksum = "83b2eb4d90d12bdda5ed17de686c2acb4c57914f8f921b8da7e112b5a36f3fe1" dependencies = [ - "darling_core 0.20.3", - "darling_macro 0.20.3", + "darling_core 0.20.9", + "darling_macro 0.20.9", ] [[package]] @@ -6723,15 +6725,15 @@ dependencies = [ [[package]] name = "darling_core" -version = "0.20.3" +version = "0.20.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "177e3443818124b357d8e76f53be906d60937f0d3a90773a664fa63fa253e621" +checksum = "622687fe0bac72a04e5599029151f5796111b90f1baaa9b544d807a5e31cd120" dependencies = [ "fnv", "ident_case", "proc-macro2", "quote", - "strsim 0.10.0", + "strsim 0.11.1", "syn 2.0.48", ] @@ -6759,11 +6761,11 @@ dependencies = [ [[package]] name = "darling_macro" -version = "0.20.3" +version = "0.20.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" +checksum = "733cabb43482b1a1b53eee8583c2b9e8684d592215ea83efd305dd31bc2f0178" dependencies = [ - "darling_core 0.20.3", + "darling_core 0.20.9", "quote", "syn 2.0.48", ] @@ -10053,9 +10055,9 @@ dependencies = [ [[package]] name = "lz4" -version = "1.24.0" +version = "1.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e9e2dd86df36ce760a60f6ff6ad526f7ba1f14ba0356f8254fb6905e6494df1" +checksum = "d6eab492fe7f8651add23237ea56dbf11b3c4ff762ab83d40a47f11433421f91" dependencies = [ "libc", "lz4-sys", @@ -10063,9 +10065,9 @@ dependencies = [ [[package]] name = "lz4-sys" -version = "1.9.4" +version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57d27b317e207b10f69f5e75494119e391a96f48861ae870d1da6edac98ca900" +checksum = "e9764018d143cc854c9f17f0b907de70f14393b1f502da6375dce70f00514eb3" dependencies = [ "cc", "libc", @@ -14365,7 +14367,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93634eb5f75a2323b16de4748022ac4297f9e76b6dced2be287a099f41b5e788" dependencies = [ - "darling 0.20.3", + "darling 0.20.9", "proc-macro2", "quote", "syn 2.0.48", @@ -14972,6 +14974,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "structopt" version = "0.3.26" diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 4d6f00ed82159..f72173e43f5a6 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -56,6 +56,7 @@ chrono = { workspace = true } claims = { workspace = true } clap = { workspace = true } dashmap = { workspace = true } +derivative = { workspace = true } enum_dispatch = { workspace = true } fail = { workspace = true } futures = { workspace = true } @@ -100,6 +101,7 @@ claims = { workspace = true } mockall = { workspace = true } move-core-types = { workspace = true } proptest = { workspace = true } +proptest-derive = { workspace = true } tempfile = { workspace = true } [features] diff --git a/consensus/src/transaction_shuffler/mod.rs b/consensus/src/transaction_shuffler/mod.rs index bb1bfdc311c87..a75ce60e0cdd6 100644 --- a/consensus/src/transaction_shuffler/mod.rs +++ b/consensus/src/transaction_shuffler/mod.rs @@ -8,6 +8,7 @@ use std::sync::Arc; mod deprecated_fairness; mod sender_aware; +mod use_case_aware; /// Interface to shuffle transactions pub trait TransactionShuffler: Send + Sync { @@ -61,5 +62,21 @@ pub fn create_transaction_shuffler( entry_fun_conflict_window_size: entry_fun_conflict_window_size as usize, }) }, + UseCaseAware { + sender_spread_factor, + platform_use_case_spread_factor, + user_use_case_spread_factor, + } => { + let config = use_case_aware::Config { + sender_spread_factor, + platform_use_case_spread_factor, + user_use_case_spread_factor, + }; + info!( + config = ?config, + "Using use case aware transaction shuffling." + ); + Arc::new(use_case_aware::UseCaseAwareShuffler { config }) + }, } } diff --git a/consensus/src/transaction_shuffler/use_case_aware/delayed_queue.rs b/consensus/src/transaction_shuffler/use_case_aware/delayed_queue.rs new file mode 100644 index 0000000000000..c72a68f47ab9f --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/delayed_queue.rs @@ -0,0 +1,503 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::transaction_shuffler::use_case_aware::{ + types::{InputIdx, OutputIdx, UseCaseAwareTransaction, UseCaseKey}, + utils::StrictMap, + Config, +}; +use move_core_types::account_address::AccountAddress; +use std::{ + collections::{hash_map, BTreeMap, HashMap, VecDeque}, + fmt::Debug, +}; + +/// Key used in priority queues. +/// Part of the key is a txn's input index which guarantees in any priority queue, of use cases or +/// accounts, there are not two entries that share the same delay key. Also, when `try_delay_till` +/// is identical, an entry relating to a earlier txn is prioritized. +#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd)] +struct DelayKey { + try_delay_till: OutputIdx, + input_idx: InputIdx, +} + +impl DelayKey { + fn new(try_delay_till: OutputIdx, input_idx: InputIdx) -> Self { + Self { + try_delay_till, + input_idx, + } + } +} + +impl Debug for DelayKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DelayKey({}, {})", self.try_delay_till, self.input_idx) + } +} + +struct TxnWithInputIdx { + input_idx: InputIdx, + txn: Txn, +} + +impl Debug for TxnWithInputIdx +where + Txn: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Txn({}: {:?})", self.input_idx, self.txn) + } +} + +#[derive(Debug)] +struct Account { + try_delay_till: OutputIdx, + /// Head txn input_idx, tracked for use when the txns queue is empty, in which case + /// it keeps the value before the last txn was dequeued. + input_idx: InputIdx, + txns: VecDeque>, +} + +impl Account +where + Txn: UseCaseAwareTransaction, +{ + fn new_with_txn(try_delay_till: OutputIdx, input_idx: InputIdx, txn: Txn) -> Self { + let txns = vec![TxnWithInputIdx { input_idx, txn }].into(); + Self { + try_delay_till, + input_idx, + txns, + } + } + + fn new_empty(try_delay_till: OutputIdx, input_idx: InputIdx) -> Self { + Self { + try_delay_till, + input_idx, + txns: VecDeque::new(), + } + } + + fn is_empty(&self) -> bool { + self.txns.is_empty() + } + + fn delay_key(&self) -> DelayKey { + DelayKey { + try_delay_till: self.try_delay_till, + input_idx: self.input_idx, + } + } + + fn expect_first_txn(&self) -> &TxnWithInputIdx { + self.txns.front().expect("Must exist.") + } + + fn expect_use_case_key(&self) -> UseCaseKey { + self.expect_first_txn().txn.parse_use_case() + } + + fn queue_txn(&mut self, input_idx: InputIdx, txn: Txn) { + if let Some(last_txn) = self.txns.back() { + assert!(last_txn.input_idx < input_idx); + } else { + self.input_idx = input_idx; + } + self.txns.push_back(TxnWithInputIdx { input_idx, txn }); + } + + fn expect_dequeue_txn(&mut self) -> TxnWithInputIdx { + let txn = self.txns.pop_front().expect("Must exist."); + if let Some(next_txn) = self.txns.front() { + self.input_idx = next_txn.input_idx; + } + txn + } + + fn update_try_delay_till(&mut self, try_delay_till: OutputIdx) { + self.try_delay_till = try_delay_till; + } +} + +#[derive(Debug)] +struct UseCase { + try_delay_till: OutputIdx, + /// Head account input_idx, tracked for use when the accounts queue is empty, in which case + /// it keeps the value before the last account was removed. + input_idx: InputIdx, + account_by_delay: BTreeMap, +} + +impl UseCase { + fn new_empty(try_delay_till: OutputIdx, input_idx: InputIdx) -> Self { + Self { + try_delay_till, + input_idx, + account_by_delay: BTreeMap::new(), + } + } + + fn new_with_account( + try_delay_till: OutputIdx, + address: AccountAddress, + account: &Account, + ) -> Self + where + Txn: UseCaseAwareTransaction, + { + let mut account_by_delay = BTreeMap::new(); + account_by_delay.strict_insert(account.delay_key(), address); + Self { + try_delay_till, + input_idx: account.input_idx, + account_by_delay, + } + } + + fn is_empty(&self) -> bool { + self.account_by_delay.is_empty() + } + + fn delay_key(&self) -> DelayKey { + // If head account will be ready later than the use case itself, respect that. + let try_delay_till = std::cmp::max( + self.try_delay_till, + self.account_by_delay + .first_key_value() + .map_or(0, |(k, _)| k.try_delay_till), + ); + + DelayKey { + try_delay_till, + input_idx: self.input_idx, + } + } + + fn expect_pop_head_account(&mut self) -> (DelayKey, AccountAddress) { + let (account_delay_key, address) = self.account_by_delay.pop_first().expect("Must exist."); + if let Some((next_account_delay_key, _)) = self.account_by_delay.first_key_value() { + self.input_idx = next_account_delay_key.input_idx; + } + (account_delay_key, address) + } + + fn update_try_delay_till(&mut self, try_delay_till: OutputIdx) { + self.try_delay_till = try_delay_till; + } + + fn add_account(&mut self, address: AccountAddress, account: &Account) + where + Txn: UseCaseAwareTransaction, + { + let account_delay_key = account.delay_key(); + self.account_by_delay + .strict_insert(account_delay_key, address); + let (_, head_address) = self.account_by_delay.first_key_value().unwrap(); + if head_address == &address { + self.input_idx = account_delay_key.input_idx; + } + } +} + +#[derive(Debug, Default)] +pub(crate) struct DelayedQueue { + accounts: HashMap>, + use_cases: HashMap, + + /// empty accounts (those w/o known delayed txns), kept to track the delay + account_placeholders_by_delay: BTreeMap, + /// empty use cases (those w/o known delayed txns), kept to track the delay + use_case_placeholders_by_delay: BTreeMap, + + /// main delay queue, all use cases are non-empty of non-empty accounts + use_cases_by_delay: BTreeMap, + + /// externally set output index; when an item has try_delay_till <= output_idx, it's deemed ready + output_idx: OutputIdx, + + config: Config, +} + +impl DelayedQueue +where + Txn: UseCaseAwareTransaction, +{ + pub fn new(config: Config) -> Self { + Self { + accounts: HashMap::new(), + use_cases: HashMap::new(), + + account_placeholders_by_delay: BTreeMap::new(), + use_case_placeholders_by_delay: BTreeMap::new(), + + use_cases_by_delay: BTreeMap::new(), + + output_idx: 0, + + config, + } + } + + /// Remove stale (empty use cases and accounts with try_delay_till <= self.output_idx) placeholders. + fn drain_placeholders(&mut self) { + let least_to_keep = DelayKey::new(self.output_idx + 1, 0); + + let remaining_use_case_placeholders = self + .use_case_placeholders_by_delay + .split_off(&least_to_keep); + let remaining_account_placeholders = + self.account_placeholders_by_delay.split_off(&least_to_keep); + + self.use_case_placeholders_by_delay + .iter() + .for_each(|(_delay_key, use_case_key)| self.use_cases.strict_remove(use_case_key)); + self.account_placeholders_by_delay + .iter() + .for_each(|(_delay_key, address)| self.accounts.strict_remove(address)); + + self.use_case_placeholders_by_delay = remaining_use_case_placeholders; + self.account_placeholders_by_delay = remaining_account_placeholders; + } + + pub fn bump_output_idx(&mut self, output_idx: OutputIdx) { + self.output_idx = output_idx; + self.drain_placeholders(); + } + + pub fn pop_head(&mut self, only_if_ready: bool) -> Option { + // See if any delayed txn exists. If not, return None. + let use_case_entry = match self.use_cases_by_delay.first_entry() { + None => { + return None; + }, + Some(occupied_entry) => occupied_entry, + }; + let use_case_delay_key = use_case_entry.key(); + + // Check readiness. + if only_if_ready && use_case_delay_key.try_delay_till > self.output_idx { + return None; + } + + // Gonna return the front txn of the front account of the front use case. + + // First, both the use case and account need to be removed from the priority queues. + let use_case_delay_key = *use_case_delay_key; + let use_case_key = use_case_entry.remove(); + let use_case = self.use_cases.expect_mut(&use_case_key); + let (account_delay_key, address) = use_case.expect_pop_head_account(); + assert!(account_delay_key.try_delay_till <= use_case_delay_key.try_delay_till); + assert_eq!(account_delay_key.input_idx, use_case_delay_key.input_idx); + + // Pop first txn from account (for returning it later). + let account = self.accounts.expect_mut(&address); + let txn = account.expect_dequeue_txn(); + + // Update priorities. + account.update_try_delay_till(self.output_idx + 1 + self.config.sender_spread_factor()); + use_case.update_try_delay_till( + self.output_idx + 1 + self.config.use_case_spread_factor(&use_case_key), + ); + + // Add account and original use case back to delay queues. + + if account.is_empty() { + self.account_placeholders_by_delay + .strict_insert(account.delay_key(), address); + if use_case.is_empty() { + self.use_case_placeholders_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + } else { + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + } + } else { + // See if account now belongs to a different use case. + let new_use_case_key = account.expect_use_case_key(); + if new_use_case_key == use_case_key { + use_case.add_account(address, account); + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + } else { + // Account now belongs to a different use case. + + // Add original use case back to delay queue. + if use_case.is_empty() { + self.use_case_placeholders_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + } else { + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + } + + // Add the account to the new use case. + match self.use_cases.entry(new_use_case_key.clone()) { + hash_map::Entry::Occupied(mut occupied_entry) => { + // Existing use case, remove from priority queues. + let new_use_case = occupied_entry.get_mut(); + if new_use_case.is_empty() { + self.use_case_placeholders_by_delay + .strict_remove(&new_use_case.delay_key()); + } else { + self.use_cases_by_delay + .strict_remove(&new_use_case.delay_key()); + } + // Add account to use case. + new_use_case.add_account(address, account); + // Add new use case back to delay queue. + self.use_cases_by_delay + .strict_insert(new_use_case.delay_key(), new_use_case_key.clone()); + }, + hash_map::Entry::Vacant(entry) => { + // Use case not tracked previously, try_delay_till = output_idx + 1 + let new_use_case = entry.insert(UseCase::new_with_account( + self.output_idx + 1, + address, + account, + )); + self.use_cases_by_delay + .strict_insert(new_use_case.delay_key(), new_use_case_key.clone()); + }, + } + } + } + + Some(txn.txn) + } + + /// Txn has to be delayed, attach it to respective account and use case. + fn queue_txn( + &mut self, + input_idx: InputIdx, + address: AccountAddress, + use_case_key: UseCaseKey, + txn: Txn, + ) { + match self.accounts.get_mut(&address) { + Some(account) => { + if account.is_empty() { + // Account placeholder exists, move it from the placeholder queue to the main queue. + self.account_placeholders_by_delay + .remove(&account.delay_key()); + account.queue_txn(input_idx, txn); + match self.use_cases.entry(use_case_key.clone()) { + hash_map::Entry::Occupied(occupied) => { + let use_case = occupied.into_mut(); + if use_case.is_empty() { + self.use_case_placeholders_by_delay + .strict_remove(&use_case.delay_key()); + } else { + self.use_cases_by_delay.strict_remove(&use_case.delay_key()); + } + use_case.add_account(address, account); + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + }, + hash_map::Entry::Vacant(vacant) => { + // Use case not tracked previously, the use case is ready at the current + // output_idx, instead of output_idx +1 -- it makes a difference if + // a txn later in the input queue that's of the same use case but not + // blocked by account delay is tested for readiness. + let use_case = + UseCase::new_with_account(self.output_idx, address, account); + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + vacant.insert(use_case); + }, + } + } else { + // Account tracked and not empty, so appending a new txn to it won't affect positions + // in delay queues + account.queue_txn(input_idx, txn); + } + }, + None => { + // Account not previously tracked. + let account = Account::new_with_txn(self.output_idx + 1, input_idx, txn); + // Account didn't exist before, so use case must have been tracked, otherwise the + // txn whould've been selected for output, bypassing the queue. + let use_case = self.use_cases.expect_mut(&use_case_key); + if use_case.is_empty() { + self.use_case_placeholders_by_delay + .strict_remove(&use_case.delay_key()); + } else { + self.use_cases_by_delay.strict_remove(&use_case.delay_key()); + } + use_case.add_account(address, &account); + + self.accounts.strict_insert(address, account); + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key.clone()); + }, + } + } + + /// Txn from input queue directly selected for output, needs to bump delays for relevant + /// account and use case. + fn update_delays_for_selected_txn( + &mut self, + input_idx: InputIdx, + address: AccountAddress, + use_case_key: UseCaseKey, + ) { + let account_try_delay_till = self.output_idx + 1 + self.config.sender_spread_factor(); + let use_case_try_delay_till = + self.output_idx + 1 + self.config.use_case_spread_factor(&use_case_key); + + match self.use_cases.entry(use_case_key.clone()) { + hash_map::Entry::Occupied(occupied) => { + let use_case = occupied.into_mut(); + // Txn wouldn't have been selected for output if the use case is empty (tracking + // for a try_delay_till > self.output_idx) + assert!(!use_case.is_empty()); + + self.use_cases_by_delay.strict_remove(&use_case.delay_key()); + use_case.update_try_delay_till(use_case_try_delay_till); + self.use_cases_by_delay + .strict_insert(use_case.delay_key(), use_case_key); + }, + hash_map::Entry::Vacant(vacant) => { + let use_case = UseCase::new_empty(use_case_try_delay_till, input_idx); + self.use_case_placeholders_by_delay + .strict_insert(use_case.delay_key(), use_case_key); + vacant.insert(use_case); + }, + } + + // Account must not have been tracked before, otherwise the txn wouldn't have been selected + // for output. + let new_account = Account::new_empty(account_try_delay_till, input_idx); + let new_account_delay_key = new_account.delay_key(); + self.accounts.strict_insert(address, new_account); + self.account_placeholders_by_delay + .strict_insert(new_account_delay_key, address); + } + + /// Return the txn back if relevant use case and sender are not subject to delaying. Otherwise, + /// Queue it up. + pub fn queue_or_return(&mut self, input_idx: InputIdx, txn: Txn) -> Option { + let address = txn.parse_sender(); + let account_opt = self.accounts.get_mut(&address); + let use_case_key = txn.parse_use_case(); + let use_case_opt = self.use_cases.get_mut(&use_case_key); + + let account_should_delay = account_opt.as_ref().map_or(false, |account| { + !account.is_empty() // needs delaying due to queued txns under the same account + || account.try_delay_till > self.output_idx + }); + let use_case_should_delay = use_case_opt + .as_ref() + .map_or(false, |use_case| use_case.try_delay_till > self.output_idx); + + if account_should_delay || use_case_should_delay { + self.queue_txn(input_idx, address, use_case_key, txn); + None + } else { + self.update_delays_for_selected_txn(input_idx, address, use_case_key); + Some(txn) + } + } +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/iterator.rs b/consensus/src/transaction_shuffler/use_case_aware/iterator.rs new file mode 100644 index 0000000000000..be1c5730ad93c --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/iterator.rs @@ -0,0 +1,100 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::transaction_shuffler::use_case_aware::{ + delayed_queue::DelayedQueue, + types::{InputIdx, OutputIdx, UseCaseAwareTransaction}, + Config, +}; +use std::{collections::VecDeque, fmt::Debug}; + +#[derive(Debug)] +pub(super) struct ShuffledTransactionIterator { + input_queue: VecDeque, + delayed_queue: DelayedQueue, + input_idx: InputIdx, + output_idx: OutputIdx, +} + +impl ShuffledTransactionIterator +where + Txn: UseCaseAwareTransaction + Debug, +{ + pub(super) fn new(config: Config) -> Self { + Self { + input_queue: VecDeque::new(), + delayed_queue: DelayedQueue::new(config), + input_idx: 0, + output_idx: 0, + } + } + + pub(super) fn extended_with(mut self, txns: impl IntoIterator) -> Self { + self.input_queue.extend(txns); + self + } + + pub(super) fn select_next_txn(&mut self) -> Option { + let ret = self.select_next_txn_inner(); + if ret.is_some() { + self.output_idx += 1; + } + ret + } + + pub(super) fn select_next_txn_inner(&mut self) -> Option { + // println!("\n### {}: selection started.\n", self.output_idx); + // println!("Starting with state:"); + // println!("{:#?}\n", self); + + self.delayed_queue.bump_output_idx(self.output_idx); + // println!("After bumping the output idx:"); + // println!("{:#?}\n", self); + + // 1. if anything delayed became ready, return it + if let Some(txn) = self.delayed_queue.pop_head(true) { + // println!( + // "--- {}: Selected {:?} from the delayed queue", + // self.output_idx, txn + // ); + return Some(txn); + } + + // 2. Otherwise, seek in the input queue for something that shouldn't be delayed due to either + // the sender or the use case. + while let Some(txn) = self.input_queue.pop_front() { + let input_idx = self.input_idx; + // println!( + // "--- {}: examining {:?} from the input queue. input_idx: {input_idx}", + // self.output_idx, txn + // ); + self.input_idx += 1; + + if let Some(txn) = self.delayed_queue.queue_or_return(input_idx, txn) { + // println!( + // "--- {}: Selected {:?} from the input queue", + // self.output_idx, txn + // ); + return Some(txn); + } + } + + // 3. If nothing is ready, return the next eligible from the delay queue + self.delayed_queue.pop_head(false) + // println!( + // "--- {}: force select head {:?} from the delay queue", + // self.output_idx, ret + // ); + } +} + +impl Iterator for ShuffledTransactionIterator +where + Txn: UseCaseAwareTransaction + Debug, +{ + type Item = Txn; + + fn next(&mut self) -> Option { + self.select_next_txn() + } +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/mod.rs b/consensus/src/transaction_shuffler/use_case_aware/mod.rs new file mode 100644 index 0000000000000..5a6daff997bbb --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/mod.rs @@ -0,0 +1,48 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::transaction_shuffler::{use_case_aware::types::UseCaseKey, TransactionShuffler}; +use aptos_types::transaction::SignedTransaction; +use iterator::ShuffledTransactionIterator; + +pub(crate) mod iterator; +pub(crate) mod types; +pub(crate) mod utils; + +pub(crate) mod delayed_queue; +#[cfg(test)] +mod tests; + +#[derive(Clone, Debug, Default)] +pub(crate) struct Config { + pub sender_spread_factor: usize, + pub platform_use_case_spread_factor: usize, + pub user_use_case_spread_factor: usize, +} + +impl Config { + pub(crate) fn sender_spread_factor(&self) -> usize { + self.sender_spread_factor + } + + pub(crate) fn use_case_spread_factor(&self, use_case_key: &UseCaseKey) -> usize { + use UseCaseKey::*; + + match use_case_key { + Platform => self.platform_use_case_spread_factor, + ContractAddress(..) | Others => self.user_use_case_spread_factor, + } + } +} + +pub struct UseCaseAwareShuffler { + pub config: Config, +} + +impl TransactionShuffler for UseCaseAwareShuffler { + fn shuffle(&self, txns: Vec) -> Vec { + ShuffledTransactionIterator::new(self.config.clone()) + .extended_with(txns) + .collect() + } +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/tests/manual.rs b/consensus/src/transaction_shuffler/use_case_aware/tests/manual.rs new file mode 100644 index 0000000000000..5b2a7d1a25c83 --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/tests/manual.rs @@ -0,0 +1,203 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::transaction_shuffler::use_case_aware::{ + iterator::ShuffledTransactionIterator, + tests, + tests::{Account, Contract}, + Config, +}; +use itertools::Itertools; + +const PP: Contract = Contract::Platform; +const OO: Contract = Contract::Others; +const C1: Contract = Contract::User(0xF1); +const C2: Contract = Contract::User(0xF2); +const C3: Contract = Contract::User(0xF3); +const A1: Account = Account(1); +const A2: Account = Account(2); +const A3: Account = Account(3); +const A4: Account = Account(4); + +fn assert_shuffle_result( + config: Config, + txns: impl IntoIterator, + expected_order: impl IntoIterator, +) { + let txns = tests::into_txns(txns); + let actual_order = ShuffledTransactionIterator::new(config) + .extended_with(txns) + .map(|txn| txn.original_idx) + .collect_vec(); + let expected_order = expected_order.into_iter().collect_vec(); + assert_eq!(actual_order, expected_order, "actual != expected"); +} + +fn three_senders_txns() -> [(Contract, Account); 10] { + [ + // 5 txns from A1 + (PP, A1), + (OO, A1), + (C1, A1), + (C2, A1), + (C3, A1), + // 3 txns from A2 + (PP, A2), + (PP, A2), + (PP, A2), + // 2 txns from A3 + (C1, A3), + (C1, A3), + ] +} + +#[test] +fn test_no_spreading() { + let config = Config { + sender_spread_factor: 0, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor: 0, + }; + let txns = three_senders_txns(); + + assert_shuffle_result(config, txns, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); +} + +#[test] +fn test_spread_by_sender_1() { + let config = Config { + sender_spread_factor: 1, + // ignore use case conflicts + platform_use_case_spread_factor: 0, + // ignore use case conflicts + user_use_case_spread_factor: 0, + }; + let txns = three_senders_txns(); + + assert_shuffle_result(config, txns, [0, 5, 1, 6, 2, 7, 3, 8, 4, 9]); +} + +#[test] +fn test_spread_by_sender_by_large_factor() { + for sender_spread_factor in [2, 3, 4] { + let config = Config { + sender_spread_factor, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor: 0, + }; + let txns = three_senders_txns(); + + assert_shuffle_result(config, txns, [0, 5, 8, 1, 6, 9, 2, 7, 3, 4]); + } +} + +fn three_contracts_txns() -> [(Contract, Account); 10] { + [ + // 5 txns from C1 + (C1, A1), + (C1, A1), + (C1, A1), + (C1, A1), + (C1, A1), + // 3 txns from C2 + (C2, A2), + (C2, A2), + (C2, A2), + // 2 txns from C3 + (C3, A3), + (C3, A3), + ] +} + +#[test] +fn test_spread_by_use_case_1() { + let config = Config { + sender_spread_factor: 0, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor: 1, + }; + let txns = three_contracts_txns(); + + assert_shuffle_result(config, txns, [0, 5, 1, 6, 2, 7, 3, 8, 4, 9]); +} + +#[test] +fn test_spread_by_use_case_by_large_factor() { + for user_use_case_spread_factor in [2, 3, 4] { + let config = Config { + sender_spread_factor: 0, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor, + }; + let txns = three_contracts_txns(); + + assert_shuffle_result(config, txns, [0, 5, 8, 1, 6, 9, 2, 7, 3, 4]); + } +} + +fn user_and_platform_use_cases() -> [(Contract, Account); 10] { + [ + // 5 txns from C1 + (C1, A1), + (C1, A1), + (C1, A1), + (C1, A1), + (C1, A1), + // 3 txns from C2 + (PP, A2), + (PP, A2), + (PP, A2), + // 2 txns from C3 + (PP, A3), + (PP, A3), + ] +} + +#[test] +fn test_platform_txn_priority_0() { + let config = Config { + sender_spread_factor: 0, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor: 3, + }; + let txns = user_and_platform_use_cases(); + + assert_shuffle_result(config, txns, [0, 5, 6, 7, 1, 8, 9, 2, 3, 4]); +} + +#[test] +fn test_platform_txn_priority_1() { + let config = Config { + sender_spread_factor: 0, + platform_use_case_spread_factor: 1, + user_use_case_spread_factor: 3, + }; + let txns = user_and_platform_use_cases(); + + assert_shuffle_result(config, txns, [0, 5, 6, 1, 7, 8, 2, 9, 3, 4]); +} + +#[test] +fn test_spread_sender_within_use_case() { + let config = Config { + sender_spread_factor: 2, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor: 1, + }; + let txns = [ + // 5 txns from C1 + (C1, A1), + (C1, A1), + (C1, A2), + (C1, A2), + (C1, A2), + // 3 txns from C2 + (C2, A3), + (C2, A3), + (C2, A3), + (C2, A4), + (C2, A4), + ]; + + assert_shuffle_result(config, txns, [0, 5, 2, 8, 1, 6, 3, 9, 4, 7]); +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/tests/mod.rs b/consensus/src/transaction_shuffler/use_case_aware/tests/mod.rs new file mode 100644 index 0000000000000..58860bd4c11cc --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/tests/mod.rs @@ -0,0 +1,94 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::transaction_shuffler::use_case_aware::types::{UseCaseAwareTransaction, UseCaseKey}; +use move_core_types::account_address::AccountAddress; +use proptest_derive::Arbitrary; +use std::fmt::Debug; + +mod manual; +mod proptests; + +#[derive(Arbitrary)] +enum Contract { + Platform, + Others, + User(u8), +} + +impl Debug for Contract { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use Contract::*; + + write!(f, "c{}", match self { + Platform => "PP".to_string(), + Others => "OO".to_string(), + User(addr) => hex::encode_upper(addr.to_be_bytes()), + }) + } +} + +#[derive(Arbitrary)] +struct Account(u8); + +impl Debug for Account { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "a{}", hex::encode_upper(self.0.to_be_bytes())) + } +} + +impl Account { + fn as_account_address(&self) -> AccountAddress { + let mut addr = [0u8; 32]; + addr[31..].copy_from_slice(&self.0.to_be_bytes()); + AccountAddress::new(addr) + } +} + +struct Transaction { + contract: Contract, + sender: Account, + original_idx: usize, +} + +impl Debug for Transaction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "t{}:{:?}{:?}", + self.original_idx, self.contract, self.sender + ) + } +} + +impl UseCaseAwareTransaction for Transaction { + fn parse_sender(&self) -> AccountAddress { + self.sender.as_account_address() + } + + fn parse_use_case(&self) -> UseCaseKey { + use UseCaseKey::*; + + match self.contract { + Contract::Platform => Platform, + Contract::Others => Others, + Contract::User(c) => ContractAddress(Account(c).as_account_address()), + } + } +} + +fn into_txns(txns: impl IntoIterator) -> Vec { + let mut original_idx = 0; + txns.into_iter() + .map(|(contract, sender)| { + let txn = Transaction { + contract, + sender, + original_idx, + }; + + original_idx += 1; + txn + }) + .collect() +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/tests/proptests.rs b/consensus/src/transaction_shuffler/use_case_aware/tests/proptests.rs new file mode 100644 index 0000000000000..ef84499776d34 --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/tests/proptests.rs @@ -0,0 +1,49 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::transaction_shuffler::use_case_aware::{ + iterator::ShuffledTransactionIterator, + tests::{into_txns, Account, Contract, Transaction}, + Config, +}; +use itertools::Itertools; +use proptest::{collection::vec, prelude::*}; +use std::collections::HashMap; + +fn txn_indices_by_account(txns: &[Transaction]) -> HashMap> { + txns.iter() + .map(|txn| (txn.sender.0, txn.original_idx)) + .into_group_map() +} + +proptest! { + #[test] + fn test_no_panic( + txns in vec(any::<(Contract, Account)>(), 0..100) + .prop_map(into_txns), + sender_factor in 0..100usize, + platform_factor in 0..100usize, + user_contract_factor in 0..100usize, + ) { + let num_txns = txns.len(); + let txns_by_account = txn_indices_by_account(&txns); + + let config = Config { + sender_spread_factor: sender_factor, + platform_use_case_spread_factor: platform_factor, + user_use_case_spread_factor: user_contract_factor, + }; + + let shuffled_txns = ShuffledTransactionIterator::new(config) + .extended_with(txns) + .collect_vec(); + + prop_assert_eq!( + txn_indices_by_account(&shuffled_txns), + txns_by_account + ); + + let txn_indices = shuffled_txns.into_iter().map(|txn| txn.original_idx).sorted().collect_vec(); + prop_assert_eq!(txn_indices, (0..num_txns).collect_vec()); + } +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/types.rs b/consensus/src/transaction_shuffler/use_case_aware/types.rs new file mode 100644 index 0000000000000..3ffda393e0640 --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/types.rs @@ -0,0 +1,58 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::transaction::SignedTransaction; +use move_core_types::account_address::AccountAddress; + +pub(crate) type InputIdx = usize; +pub(crate) type OutputIdx = usize; + +#[derive(Clone, Eq, Hash, PartialEq)] +pub(crate) enum UseCaseKey { + Platform, + ContractAddress(AccountAddress), + // ModuleBundle (deprecated anyway), scripts, Multisig. + Others, +} + +impl std::fmt::Debug for UseCaseKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use UseCaseKey::*; + + match self { + Platform => write!(f, "PP"), + ContractAddress(addr) => write!(f, "c{}", hex::encode_upper(&addr[31..])), + Others => write!(f, "OO"), + } + } +} + +pub(crate) trait UseCaseAwareTransaction { + fn parse_sender(&self) -> AccountAddress; + + fn parse_use_case(&self) -> UseCaseKey; +} + +impl UseCaseAwareTransaction for SignedTransaction { + fn parse_sender(&self) -> AccountAddress { + self.sender() + } + + fn parse_use_case(&self) -> UseCaseKey { + use aptos_types::transaction::TransactionPayload::*; + use UseCaseKey::*; + + match self.payload() { + Script(_) | ModuleBundle(_) | Multisig(_) => Others, + EntryFunction(entry_fun) => { + let module_id = entry_fun.module(); + if module_id.address().is_special() { + Platform + } else { + // n.b. Generics ignored. + ContractAddress(*module_id.address()) + } + }, + } + } +} diff --git a/consensus/src/transaction_shuffler/use_case_aware/utils.rs b/consensus/src/transaction_shuffler/use_case_aware/utils.rs new file mode 100644 index 0000000000000..a0e75d075766e --- /dev/null +++ b/consensus/src/transaction_shuffler/use_case_aware/utils.rs @@ -0,0 +1,43 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + collections::{BTreeMap, HashMap}, + hash::Hash, +}; + +pub(crate) trait StrictMap { + fn strict_insert(&mut self, key: K, value: V); + + fn strict_remove(&mut self, key: &K); + + fn expect_mut(&mut self, key: &K) -> &mut V; +} + +impl StrictMap for HashMap { + fn strict_insert(&mut self, key: K, value: V) { + assert!(self.insert(key, value).is_none()) + } + + fn strict_remove(&mut self, key: &K) { + assert!(self.remove(key).is_some()) + } + + fn expect_mut(&mut self, key: &K) -> &mut V { + self.get_mut(key).expect("Known to exist.") + } +} + +impl StrictMap for BTreeMap { + fn strict_insert(&mut self, key: K, value: V) { + assert!(self.insert(key, value).is_none()) + } + + fn strict_remove(&mut self, key: &K) { + assert!(self.remove(key).is_some()) + } + + fn expect_mut(&mut self, key: &K) -> &mut V { + self.get_mut(key).expect("Known to exist.") + } +} diff --git a/types/src/on_chain_config/execution_config.rs b/types/src/on_chain_config/execution_config.rs index 6f091d11cd30c..19378782623ad 100644 --- a/types/src/on_chain_config/execution_config.rs +++ b/types/src/on_chain_config/execution_config.rs @@ -70,7 +70,11 @@ impl OnChainExecutionConfig { /// Features that are ready for deployment can be enabled here. pub fn default_for_genesis() -> Self { OnChainExecutionConfig::V4(ExecutionConfigV4 { - transaction_shuffler_type: TransactionShufflerType::SenderAwareV2(32), + transaction_shuffler_type: TransactionShufflerType::UseCaseAware { + sender_spread_factor: 32, + platform_use_case_spread_factor: 0, + user_use_case_spread_factor: 32, + }, block_gas_limit_type: BlockGasLimitType::default_for_genesis(), transaction_deduper_type: TransactionDeduperType::TxnHashAndAuthenticatorV1, }) @@ -153,6 +157,11 @@ pub enum TransactionShufflerType { module_conflict_window_size: u32, entry_fun_conflict_window_size: u32, }, + UseCaseAware { + sender_spread_factor: usize, + platform_use_case_spread_factor: usize, + user_use_case_spread_factor: usize, + }, } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]