diff --git a/crates/transaction-emitter-lib/src/emitter/account_minter.rs b/crates/transaction-emitter-lib/src/emitter/account_minter.rs index 30d591d3936b4..4409dfa93b383 100644 --- a/crates/transaction-emitter-lib/src/emitter/account_minter.rs +++ b/crates/transaction-emitter-lib/src/emitter/account_minter.rs @@ -230,7 +230,7 @@ impl<'t> AccountMinter<'t> { pub async fn create_and_fund_accounts( &mut self, txn_executor: &dyn ReliableTransactionSubmitter, - account_generator: Box, + seed_accounts: Vec>, local_accounts: Vec>, coins_per_account: u64, max_submit_batch_size: usize, @@ -244,10 +244,8 @@ impl<'t> AccountMinter<'t> { num_accounts, coins_per_account, ); - let expected_num_seed_accounts = - (num_accounts / 50).clamp(1, (num_accounts as f32).sqrt() as usize + 1); let expected_children_per_seed_account = - (num_accounts + expected_num_seed_accounts - 1) / expected_num_seed_accounts; + (num_accounts + seed_accounts.len() - 1) / seed_accounts.len(); let coins_per_seed_account = Self::funds_needed_for_multi_transfer( "seed", @@ -258,7 +256,7 @@ impl<'t> AccountMinter<'t> { ); let coins_for_source = Self::funds_needed_for_multi_transfer( if mint_to_root { "root" } else { "source" }, - expected_num_seed_accounts as u64, + seed_accounts.len() as u64, coins_per_seed_account, self.txn_factory.get_max_gas_amount(), self.txn_factory.get_gas_unit_price(), @@ -295,18 +293,15 @@ impl<'t> AccountMinter<'t> { // Create seed accounts with which we can create actual accounts concurrently. Adding // additional fund for paying gas fees later. - let seed_accounts = self - .create_and_fund_seed_accounts( - new_source_account, - txn_executor, - account_generator, - expected_num_seed_accounts, - coins_per_seed_account, - max_submit_batch_size, - &request_counters, - ) - .await?; - let actual_num_seed_accounts = seed_accounts.len(); + self.create_and_fund_seed_accounts( + new_source_account, + txn_executor, + &seed_accounts, + coins_per_seed_account, + max_submit_batch_size, + &request_counters, + ) + .await?; info!( "Completed creating {} seed accounts in {}s, each with {} coins, request stats: {}", @@ -326,7 +321,7 @@ impl<'t> AccountMinter<'t> { let request_counters = txn_executor.create_counter_state(); let approx_accounts_per_seed = - (num_accounts + actual_num_seed_accounts - 1) / actual_num_seed_accounts; + (num_accounts + seed_accounts.len() - 1) / seed_accounts.len(); let local_accounts_by_seed: Vec>> = local_accounts .chunks(approx_accounts_per_seed) @@ -377,12 +372,11 @@ impl<'t> AccountMinter<'t> { &mut self, new_source_account: Option, txn_executor: &dyn ReliableTransactionSubmitter, - account_generator: Box, - seed_account_num: usize, + seed_accounts: &[Arc], coins_per_seed_account: u64, max_submit_batch_size: usize, counters: &CounterState, - ) -> Result> { + ) -> Result<()> { info!( "Creating and funding seeds accounts (txn {} gas price)", self.txn_factory.get_gas_unit_price() @@ -392,10 +386,6 @@ impl<'t> AccountMinter<'t> { Some(param_account) => Arc::new(param_account), }; - let seed_accounts = account_generator - .gen_local_accounts(txn_executor, seed_account_num, self.account_rng()) - .await?; - for chunk in seed_accounts.chunks(max_submit_batch_size) { let txn_factory = &self.txn_factory; let create_requests: Vec<_> = chunk @@ -414,7 +404,7 @@ impl<'t> AccountMinter<'t> { .await?; } - Ok(seed_accounts) + Ok(()) } pub async fn load_vasp_account( @@ -496,7 +486,7 @@ impl<'t> AccountMinter<'t> { /// Create `num_new_accounts` by transferring coins from `source_account`. Return Vec of created /// accounts async fn create_and_fund_new_accounts( - source_account: LocalAccount, + source_account: Arc, accounts: Vec>, coins_per_new_account: u64, max_num_accounts_per_batch: usize, @@ -509,7 +499,6 @@ async fn create_and_fund_new_accounts( .map(|chunk| chunk.to_vec()) .collect::>(); let source_address = source_account.address(); - let source_account = Arc::new(source_account); for batch in accounts_by_batch { let creation_requests: Vec<_> = batch .iter() @@ -642,16 +631,33 @@ pub async fn bulk_create_accounts( seed ); + let mut rng = StdRng::from_seed(seed); + + let num_seed_accounts = (num_accounts / 50).clamp(1, (num_accounts as f32).sqrt() as usize + 1); + let seed_accounts = account_generator + .gen_local_accounts(txn_executor, num_seed_accounts, &mut rng) + .await?; + let accounts = account_generator - .gen_local_accounts(txn_executor, num_accounts, &mut StdRng::from_seed(seed)) + .gen_local_accounts(txn_executor, num_accounts, &mut rng) .await?; + info!( "Generated and fetched re-usable accounts for seed {:?}", seed ); let all_accounts_already_exist = accounts.iter().all(|account| account.sequence_number() > 0); - let send_money_gas = if all_accounts_already_exist { + let all_seed_accounts_already_exist = seed_accounts + .iter() + .all(|account| account.sequence_number() > 0); + + info!( + "Accounts exist: {}, seed accounts exist: {}", + all_accounts_already_exist, all_seed_accounts_already_exist + ); + + let send_money_gas = if all_accounts_already_exist && all_seed_accounts_already_exist { config.expected_gas_per_transfer } else { config.expected_gas_per_account_create @@ -668,10 +674,12 @@ pub async fn bulk_create_accounts( if !config.skip_funding_accounts { let accounts: Vec<_> = accounts.into_iter().map(Arc::new).collect(); + let seed_accounts: Vec<_> = seed_accounts.into_iter().map(Arc::new).collect(); + account_minter .create_and_fund_accounts( txn_executor, - account_generator, + seed_accounts.clone(), accounts.clone(), coins_per_account, config.max_submit_batch_size, diff --git a/crates/transaction-emitter-lib/src/emitter/mod.rs b/crates/transaction-emitter-lib/src/emitter/mod.rs index 1b9440674f563..2d7b9fddaf97e 100644 --- a/crates/transaction-emitter-lib/src/emitter/mod.rs +++ b/crates/transaction-emitter-lib/src/emitter/mod.rs @@ -57,13 +57,12 @@ pub const EXPECTED_GAS_PER_ACCOUNT_CREATE: u64 = 2000 + 8; const MAX_RETRIES: usize = 12; -// This retry policy is used for important client calls necessary for setting -// up the test (e.g. account creation) and collecting its results (e.g. checking -// account sequence numbers). If these fail, the whole test fails. We do not use -// this for submitting transactions, as we have a way to handle when that fails. -// This retry policy means an operation will take 8 seconds at most. -pub static RETRY_POLICY: Lazy = Lazy::new(|| { - RetryPolicy::exponential(Duration::from_millis(125)) +// This retry policy is used for querying sequence numbers and account balances in the initialization step. +// If these fail, the whole test fails. Backoff is large, as generally only other side +// throttling our requests is the cause for failures. +// We do not use this for submitting transactions, as we have a way to handle when that fails. +static FETCH_ACCOUNT_RETRY_POLICY: Lazy = Lazy::new(|| { + RetryPolicy::exponential(Duration::from_secs(1)) .with_max_retries(MAX_RETRIES) .with_jitter(true) }); @@ -181,6 +180,8 @@ pub struct EmitJobRequest { coordination_delay_between_instances: Duration, latency_polling_interval: Duration, + // Default additional wait is (txn_expiration_time_secs + 5). Override to wait for different length. + tps_wait_after_expiration_secs: Option, account_minter_seed: Option<[u8; 32]>, } @@ -211,6 +212,7 @@ impl Default for EmitJobRequest { prompt_before_spending: false, coordination_delay_between_instances: Duration::from_secs(0), latency_polling_interval: Duration::from_millis(300), + tps_wait_after_expiration_secs: None, account_minter_seed: None, coins_per_account_override: None, } @@ -443,7 +445,12 @@ impl EmitJobRequest { // That's why we set wait_seconds conservativelly, to make sure all processing and // client calls finish within that time. - let wait_seconds = self.txn_expiration_time_secs + 180; + let wait_seconds = + if let Some(wait_after_expiration) = self.tps_wait_after_expiration_secs { + self.txn_expiration_time_secs + wait_after_expiration + } else { + self.txn_expiration_time_secs * 2 + 5 + }; // In case we set a very low TPS, we need to still be able to spread out // transactions, at least to the seconds granularity, so we reduce transactions_per_account // if needed. @@ -1011,7 +1018,7 @@ where I: Iterator, { let futures = addresses.map(|address| { - RETRY_POLICY.retry(move || get_account_address_and_seq_num(client, *address)) + FETCH_ACCOUNT_RETRY_POLICY.retry(move || get_account_address_and_seq_num(client, *address)) }); let (seq_nums, timestamps): (Vec<_>, Vec<_>) = try_join_all(futures) diff --git a/crates/transaction-emitter-lib/src/emitter/submission_worker.rs b/crates/transaction-emitter-lib/src/emitter/submission_worker.rs index 8c986297978fd..92bdc047ceaa0 100644 --- a/crates/transaction-emitter-lib/src/emitter/submission_worker.rs +++ b/crates/transaction-emitter-lib/src/emitter/submission_worker.rs @@ -8,7 +8,7 @@ use crate::{ }, EmitModeParams, }; -use aptos_logger::{debug, info, sample, sample::SampleRate, warn}; +use aptos_logger::{debug, error, info, sample, sample::SampleRate, warn}; use aptos_rest_client::Client as RestClient; use aptos_sdk::{ move_types::account_address::AccountAddress, @@ -97,9 +97,9 @@ impl SubmissionWorker { && loop_start_time.duration_since(wait_until) > Duration::from_secs(5) { sample!( - SampleRate::Duration(Duration::from_secs(120)), - warn!( - "[{:?}] txn_emitter worker drifted out of sync too much: {}s", + SampleRate::Duration(Duration::from_secs(5)), + error!( + "[{:?}] txn_emitter worker drifted out of sync too much: {}s. Is expiration too short, or 5s buffer on top of it?", self.client().path_prefix_string(), loop_start_time.duration_since(wait_until).as_secs() ) @@ -163,7 +163,7 @@ impl SubmissionWorker { let submitted_after = loop_start_time.elapsed(); if submitted_after.as_secs() > 5 { sample!( - SampleRate::Duration(Duration::from_secs(120)), + SampleRate::Duration(Duration::from_secs(30)), warn!( "[{:?}] txn_emitter worker waited for more than 5s to submit transactions: {}s after loop start", self.client().path_prefix_string(), @@ -176,7 +176,7 @@ impl SubmissionWorker { // we also don't want to be stuck waiting for txn_expiration_time_secs // after stop is called, so we sleep until time or stop is set. self.sleep_check_done(Duration::from_secs( - self.params.txn_expiration_time_secs + 20, + self.params.txn_expiration_time_secs + 3, )) .await } @@ -271,7 +271,7 @@ impl SubmissionWorker { .expired .fetch_add(num_expired as u64, Ordering::Relaxed); sample!( - SampleRate::Duration(Duration::from_secs(120)), + SampleRate::Duration(Duration::from_secs(60)), warn!( "[{:?}] Transactions were not committed before expiration: {:?}, for {:?}", self.client().path_prefix_string(), diff --git a/crates/transaction-emitter-lib/src/emitter/transaction_executor.rs b/crates/transaction-emitter-lib/src/emitter/transaction_executor.rs index 4a2cae20c5f13..63d72060dfb29 100644 --- a/crates/transaction-emitter-lib/src/emitter/transaction_executor.rs +++ b/crates/transaction-emitter-lib/src/emitter/transaction_executor.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::RETRY_POLICY; +use super::FETCH_ACCOUNT_RETRY_POLICY; use anyhow::{Context, Result}; use aptos_logger::{debug, info, sample, sample::SampleRate, warn}; use aptos_rest_client::{aptos_api_types::AptosErrorCode, error::RestError, Client as RestClient}; @@ -269,7 +269,7 @@ pub async fn query_sequence_number_with_client( rest_client: &RestClient, account_address: AccountAddress, ) -> Result { - let result = RETRY_POLICY + let result = FETCH_ACCOUNT_RETRY_POLICY .retry_if( move || rest_client.get_account_bcs(account_address), |error: &RestError| !is_account_not_found(error), @@ -294,7 +294,7 @@ fn is_account_not_found(error: &RestError) -> bool { #[async_trait] impl ReliableTransactionSubmitter for RestApiReliableTransactionSubmitter { async fn get_account_balance(&self, account_address: AccountAddress) -> Result { - Ok(RETRY_POLICY + Ok(FETCH_ACCOUNT_RETRY_POLICY .retry(move || { self.random_rest_client() .get_account_balance(account_address)