Skip to content

Commit

Permalink
compat expansion; forge refactor (#13302)
Browse files Browse the repository at this point in the history
Expand "compat" forge test to simultaneously do traffic generation, gather stats, and run a gradual upgrade of validator nodes.

Lots of refactor to get there, replacing lots of `&Foo` and `&mut Foo` with `Arc<Mutex<Foo>>` and hidden internal mutability to make things multithread capable.
  • Loading branch information
brianolson authored Jun 27, 2024
1 parent f90f322 commit de7be81
Show file tree
Hide file tree
Showing 86 changed files with 2,262 additions and 1,600 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 33 additions & 25 deletions crates/transaction-emitter-lib/src/emitter/account_minter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use aptos_sdk::{
use aptos_transaction_generator_lib::{
CounterState, ReliableTransactionSubmitter, RootAccountHandle, SEND_AMOUNT,
};
use aptos_types::account_address::AccountAddress;
use core::{
cmp::min,
result::Result::{Err, Ok},
Expand All @@ -31,7 +32,7 @@ use std::{
};

pub struct SourceAccountManager<'t> {
pub source_account: &'t LocalAccount,
pub source_account: Arc<LocalAccount>,
pub txn_executor: &'t dyn ReliableTransactionSubmitter,
pub req: &'t EmitJobRequest,
pub txn_factory: TransactionFactory,
Expand All @@ -43,17 +44,21 @@ impl<'t> RootAccountHandle for SourceAccountManager<'t> {
self.check_approve_funds(amount, reason).await.unwrap();
}

fn get_root_account(&self) -> &LocalAccount {
self.source_account
fn get_root_account(&self) -> Arc<LocalAccount> {
self.source_account.clone()
}
}

impl<'t> SourceAccountManager<'t> {
fn source_account_address(&self) -> AccountAddress {
self.source_account.address()
}

// returns true if we might want to recheck the volume, as it was auto-approved.
async fn check_approve_funds(&self, amount: u64, reason: &str) -> Result<bool> {
let balance = self
.txn_executor
.get_account_balance(self.source_account.address())
.get_account_balance(self.source_account_address())
.await?;
Ok(if self.req.mint_to_root {
// We have a root account, so amount of funds minted is not a problem
Expand All @@ -63,7 +68,7 @@ impl<'t> SourceAccountManager<'t> {
if balance < amount.checked_mul(100).unwrap_or(u64::MAX / 2) {
info!(
"Mint account {} current balance is {}, needing {} for {}, minting to refil it fully",
self.source_account.address(),
self.source_account_address(),
balance,
amount,
reason,
Expand All @@ -74,7 +79,7 @@ impl<'t> SourceAccountManager<'t> {
} else {
info!(
"Mint account {} current balance is {}, needing {} for {}. Proceeding without minting, as balance would overflow otherwise",
self.source_account.address(),
self.source_account_address(),
balance,
amount,
reason,
Expand All @@ -85,7 +90,7 @@ impl<'t> SourceAccountManager<'t> {
} else {
info!(
"Source account {} current balance is {}, needed {} coins for {}, or {:.3}% of its balance",
self.source_account.address(),
self.source_account_address(),
balance,
amount,
reason,
Expand All @@ -95,7 +100,7 @@ impl<'t> SourceAccountManager<'t> {
if balance < amount {
return Err(anyhow!(
"Source ({}) doesn't have enough coins, balance {} < needed {} for {}",
self.source_account.address(),
self.source_account_address(),
balance,
amount,
reason
Expand Down Expand Up @@ -128,15 +133,15 @@ impl<'t> SourceAccountManager<'t> {
let txn = self
.source_account
.sign_with_transaction_builder(self.txn_factory.payload(
aptos_stdlib::aptos_coin_mint(self.source_account.address(), amount),
aptos_stdlib::aptos_coin_mint(self.source_account_address(), amount),
));

if let Err(e) = txn_executor.execute_transactions(&[txn]).await {
// This cannot work simultaneously across different txn emitters,
// so check on failure if another emitter has refilled it instead

let balance = txn_executor
.get_account_balance(self.source_account.address())
.get_account_balance(self.source_account_address())
.await?;
if balance > u64::MAX / 2 {
Ok(())
Expand Down Expand Up @@ -393,7 +398,7 @@ impl<'t> AccountMinter<'t> {

pub async fn create_and_fund_seed_accounts(
&mut self,
mut new_source_account: Option<LocalAccount>,
new_source_account: Option<LocalAccount>,
txn_executor: &dyn ReliableTransactionSubmitter,
account_generator: Box<dyn LocalAccountGenerator>,
seed_account_num: usize,
Expand All @@ -407,6 +412,10 @@ impl<'t> AccountMinter<'t> {
);
let mut i = 0;
let mut seed_accounts = vec![];
let source_account = match new_source_account {
None => self.source_account.get_root_account().clone(),
Some(param_account) => Arc::new(param_account),
};
while i < seed_account_num {
let batch_size = min(max_submit_batch_size, seed_account_num - i);
let mut rng = StdRng::from_rng(self.rng()).unwrap();
Expand All @@ -418,11 +427,7 @@ impl<'t> AccountMinter<'t> {
.iter()
.map(|account| {
create_and_fund_account_request(
if let Some(account) = &mut new_source_account {
account
} else {
self.source_account.get_root_account()
},
source_account.clone(),
coins_per_seed_account,
account.public_key(),
txn_factory,
Expand Down Expand Up @@ -470,16 +475,17 @@ impl<'t> AccountMinter<'t> {
coins_for_source: u64,
) -> Result<LocalAccount> {
const NUM_TRIES: usize = 3;
let root_account = self.source_account.get_root_account();
let root_address = root_account.address();
for i in 0..NUM_TRIES {
self.source_account.get_root_account().set_sequence_number(
txn_executor
.query_sequence_number(self.source_account.get_root_account().address())
.await?,
);
{
let new_sequence_number = txn_executor.query_sequence_number(root_address).await?;
root_account.set_sequence_number(new_sequence_number);
}

let new_source_account = LocalAccount::generate(self.rng());
let txn = create_and_fund_account_request(
self.source_account.get_root_account(),
root_account.clone(),
coins_for_source,
new_source_account.public_key(),
&self.txn_factory,
Expand Down Expand Up @@ -530,12 +536,14 @@ async fn create_and_fund_new_accounts(
.chunks(max_num_accounts_per_batch)
.map(|chunk| chunk.to_vec())
.collect::<Vec<_>>();
let source_address = source_account.address();
let source_account = Arc::new(source_account);
for batch in accounts_by_batch {
let creation_requests: Vec<_> = batch
.iter()
.map(|account| {
create_and_fund_account_request(
&source_account,
source_account.clone(),
coins_per_new_account,
account.public_key(),
txn_factory,
Expand All @@ -546,13 +554,13 @@ async fn create_and_fund_new_accounts(
txn_executor
.execute_transactions_with_counter(&creation_requests, counters)
.await
.with_context(|| format!("Account {} couldn't mint", source_account.address()))?;
.with_context(|| format!("Account {} couldn't mint", source_address))?;
}
Ok(())
}

pub fn create_and_fund_account_request(
creation_account: &LocalAccount,
creation_account: Arc<LocalAccount>,
amount: u64,
pubkey: &Ed25519PublicKey,
txn_factory: &TransactionFactory,
Expand Down
90 changes: 9 additions & 81 deletions crates/transaction-emitter-lib/src/emitter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::emitter::{
use again::RetryPolicy;
use anyhow::{ensure, format_err, Result};
use aptos_config::config::DEFAULT_MAX_SUBMIT_TRANSACTION_BATCH_SIZE;
use aptos_logger::{debug, error, info, sample, sample::SampleRate, warn};
use aptos_logger::{error, info, sample, sample::SampleRate, warn};
use aptos_rest_client::{aptos_api_types::AptosErrorCode, error::RestError, Client as RestClient};
use aptos_sdk::{
move_types::account_address::AccountAddress,
Expand Down Expand Up @@ -649,7 +649,7 @@ impl EmitJob {
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct TxnEmitter {
txn_factory: TransactionFactory,
rng: StdRng,
Expand All @@ -673,7 +673,7 @@ impl TxnEmitter {

pub async fn start_job(
&mut self,
root_account: &LocalAccount,
root_account: Arc<LocalAccount>,
req: EmitJobRequest,
stats_tracking_phases: usize,
) -> Result<EmitJob> {
Expand Down Expand Up @@ -708,7 +708,7 @@ impl TxnEmitter {
let account_generator = create_account_generator(req.account_type);

let mut all_accounts = create_accounts(
root_account,
root_account.clone(),
&init_txn_factory,
account_generator,
&req,
Expand All @@ -730,7 +730,7 @@ impl TxnEmitter {
retry_after: req.init_retry_interval,
};
let source_account_manager = SourceAccountManager {
source_account: root_account,
source_account: root_account.clone(),
txn_executor: &txn_executor,
req: &req,
txn_factory: init_txn_factory.clone(),
Expand Down Expand Up @@ -819,7 +819,7 @@ impl TxnEmitter {

async fn emit_txn_for_impl(
mut self,
source_account: &LocalAccount,
source_account: Arc<LocalAccount>,
emit_job_request: EmitJobRequest,
duration: Duration,
print_stats_interval: Option<u64>,
Expand Down Expand Up @@ -855,7 +855,7 @@ impl TxnEmitter {

pub async fn emit_txn_for(
self,
source_account: &mut LocalAccount,
source_account: Arc<LocalAccount>,
emit_job_request: EmitJobRequest,
duration: Duration,
) -> Result<TxnStats> {
Expand All @@ -865,7 +865,7 @@ impl TxnEmitter {

pub async fn emit_txn_for_with_stats(
self,
source_account: &LocalAccount,
source_account: Arc<LocalAccount>,
emit_job_request: EmitJobRequest,
duration: Duration,
interval_secs: u64,
Expand Down Expand Up @@ -982,78 +982,6 @@ async fn wait_for_accounts_sequence(
(latest_fetched_counts, sum_of_completion_timestamps_millis)
}

fn update_seq_num_and_get_num_expired(
accounts: &mut [LocalAccount],
account_to_start_and_end_seq_num: HashMap<AccountAddress, (u64, u64)>,
latest_fetched_counts: HashMap<AccountAddress, u64>,
) -> (usize, usize) {
accounts.iter_mut().for_each(|account| {
let (start_seq_num, end_seq_num) =
if let Some(pair) = account_to_start_and_end_seq_num.get(&account.address()) {
pair
} else {
return;
};
assert!(account.sequence_number() == *end_seq_num);

match latest_fetched_counts.get(&account.address()) {
Some(count) => {
if *count != account.sequence_number() {
assert!(account.sequence_number() > *count);
debug!(
"Stale sequence_number for {}, expected {}, setting to {}",
account.address(),
account.sequence_number(),
count
);
account.set_sequence_number(*count);
}
},
None => {
debug!(
"Couldn't fetch sequence_number for {}, expected {}, setting to {}",
account.address(),
account.sequence_number(),
start_seq_num
);
account.set_sequence_number(*start_seq_num);
},
}
});

account_to_start_and_end_seq_num
.iter()
.map(
|(address, (start_seq_num, end_seq_num))| match latest_fetched_counts.get(address) {
Some(count) => {
assert!(
*count <= *end_seq_num,
"{address} :: {count} > {end_seq_num}"
);
if *count >= *start_seq_num {
(
(*count - *start_seq_num) as usize,
(*end_seq_num - *count) as usize,
)
} else {
debug!(
"Stale sequence_number fetched for {}, start_seq_num {}, fetched {}",
address, start_seq_num, *count
);
(0, (*end_seq_num - *start_seq_num) as usize)
}
},
None => (0, (end_seq_num - start_seq_num) as usize),
},
)
.fold(
(0, 0),
|(committed, expired), (cur_committed, cur_expired)| {
(committed + cur_committed, expired + cur_expired)
},
)
}

pub async fn query_sequence_number(client: &RestClient, address: AccountAddress) -> Result<u64> {
Ok(query_sequence_numbers(client, [address].iter()).await?.0[0].1)
}
Expand Down Expand Up @@ -1133,7 +1061,7 @@ pub fn parse_seed(seed_string: &str) -> [u8; 32] {
}

pub async fn create_accounts(
root_account: &LocalAccount,
root_account: Arc<LocalAccount>,
txn_factory: &TransactionFactory,
account_generator: Box<dyn LocalAccountGenerator>,
req: &EmitJobRequest,
Expand Down
Loading

0 comments on commit de7be81

Please sign in to comment.