Skip to content

Commit

Permalink
[txn-emitter] Fix check whether seed accounts exist, and reduce num a…
Browse files Browse the repository at this point in the history
…ccounts needed for --target-tps
  • Loading branch information
igor-aptos committed Jul 18, 2024
1 parent 2591aae commit f187562
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 50 deletions.
70 changes: 39 additions & 31 deletions crates/transaction-emitter-lib/src/emitter/account_minter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl<'t> AccountMinter<'t> {
pub async fn create_and_fund_accounts(
&mut self,
txn_executor: &dyn ReliableTransactionSubmitter,
account_generator: Box<dyn LocalAccountGenerator>,
seed_accounts: Vec<Arc<LocalAccount>>,
local_accounts: Vec<Arc<LocalAccount>>,
coins_per_account: u64,
max_submit_batch_size: usize,
Expand All @@ -244,10 +244,8 @@ impl<'t> AccountMinter<'t> {
num_accounts, coins_per_account,
);

let expected_num_seed_accounts =
(num_accounts / 50).clamp(1, (num_accounts as f32).sqrt() as usize + 1);
let expected_children_per_seed_account =
(num_accounts + expected_num_seed_accounts - 1) / expected_num_seed_accounts;
(num_accounts + seed_accounts.len() - 1) / seed_accounts.len();

let coins_per_seed_account = Self::funds_needed_for_multi_transfer(
"seed",
Expand All @@ -258,7 +256,7 @@ impl<'t> AccountMinter<'t> {
);
let coins_for_source = Self::funds_needed_for_multi_transfer(
if mint_to_root { "root" } else { "source" },
expected_num_seed_accounts as u64,
seed_accounts.len() as u64,
coins_per_seed_account,
self.txn_factory.get_max_gas_amount(),
self.txn_factory.get_gas_unit_price(),
Expand Down Expand Up @@ -295,18 +293,15 @@ impl<'t> AccountMinter<'t> {

// Create seed accounts with which we can create actual accounts concurrently. Adding
// additional fund for paying gas fees later.
let seed_accounts = self
.create_and_fund_seed_accounts(
new_source_account,
txn_executor,
account_generator,
expected_num_seed_accounts,
coins_per_seed_account,
max_submit_batch_size,
&request_counters,
)
.await?;
let actual_num_seed_accounts = seed_accounts.len();
self.create_and_fund_seed_accounts(
new_source_account,
txn_executor,
&seed_accounts,
coins_per_seed_account,
max_submit_batch_size,
&request_counters,
)
.await?;

info!(
"Completed creating {} seed accounts in {}s, each with {} coins, request stats: {}",
Expand All @@ -326,7 +321,7 @@ impl<'t> AccountMinter<'t> {
let request_counters = txn_executor.create_counter_state();

let approx_accounts_per_seed =
(num_accounts + actual_num_seed_accounts - 1) / actual_num_seed_accounts;
(num_accounts + seed_accounts.len() - 1) / seed_accounts.len();

let local_accounts_by_seed: Vec<Vec<Arc<LocalAccount>>> = local_accounts
.chunks(approx_accounts_per_seed)
Expand Down Expand Up @@ -377,12 +372,11 @@ impl<'t> AccountMinter<'t> {
&mut self,
new_source_account: Option<LocalAccount>,
txn_executor: &dyn ReliableTransactionSubmitter,
account_generator: Box<dyn LocalAccountGenerator>,
seed_account_num: usize,
seed_accounts: &[Arc<LocalAccount>],
coins_per_seed_account: u64,
max_submit_batch_size: usize,
counters: &CounterState,
) -> Result<Vec<LocalAccount>> {
) -> Result<()> {
info!(
"Creating and funding seeds accounts (txn {} gas price)",
self.txn_factory.get_gas_unit_price()
Expand All @@ -392,10 +386,6 @@ impl<'t> AccountMinter<'t> {
Some(param_account) => Arc::new(param_account),
};

let seed_accounts = account_generator
.gen_local_accounts(txn_executor, seed_account_num, self.account_rng())
.await?;

for chunk in seed_accounts.chunks(max_submit_batch_size) {
let txn_factory = &self.txn_factory;
let create_requests: Vec<_> = chunk
Expand All @@ -414,7 +404,7 @@ impl<'t> AccountMinter<'t> {
.await?;
}

Ok(seed_accounts)
Ok(())
}

pub async fn load_vasp_account(
Expand Down Expand Up @@ -496,7 +486,7 @@ impl<'t> AccountMinter<'t> {
/// Create `num_new_accounts` by transferring coins from `source_account`. Return Vec of created
/// accounts
async fn create_and_fund_new_accounts(
source_account: LocalAccount,
source_account: Arc<LocalAccount>,
accounts: Vec<Arc<LocalAccount>>,
coins_per_new_account: u64,
max_num_accounts_per_batch: usize,
Expand All @@ -509,7 +499,6 @@ async fn create_and_fund_new_accounts(
.map(|chunk| chunk.to_vec())
.collect::<Vec<_>>();
let source_address = source_account.address();
let source_account = Arc::new(source_account);
for batch in accounts_by_batch {
let creation_requests: Vec<_> = batch
.iter()
Expand Down Expand Up @@ -642,16 +631,33 @@ pub async fn bulk_create_accounts(
seed
);

let mut rng = StdRng::from_seed(seed);

let num_seed_accounts = (num_accounts / 50).clamp(1, (num_accounts as f32).sqrt() as usize + 1);
let seed_accounts = account_generator
.gen_local_accounts(txn_executor, num_seed_accounts, &mut rng)
.await?;

let accounts = account_generator
.gen_local_accounts(txn_executor, num_accounts, &mut StdRng::from_seed(seed))
.gen_local_accounts(txn_executor, num_accounts, &mut rng)
.await?;

info!(
"Generated and fetched re-usable accounts for seed {:?}",
seed
);

let all_accounts_already_exist = accounts.iter().all(|account| account.sequence_number() > 0);
let send_money_gas = if all_accounts_already_exist {
let all_seed_accounts_already_exist = seed_accounts
.iter()
.all(|account| account.sequence_number() > 0);

info!(
"Accounts exist: {}, seed accounts exist: {}",
all_accounts_already_exist, all_seed_accounts_already_exist
);

let send_money_gas = if all_accounts_already_exist && all_seed_accounts_already_exist {
config.expected_gas_per_transfer
} else {
config.expected_gas_per_account_create
Expand All @@ -668,10 +674,12 @@ pub async fn bulk_create_accounts(

if !config.skip_funding_accounts {
let accounts: Vec<_> = accounts.into_iter().map(Arc::new).collect();
let seed_accounts: Vec<_> = seed_accounts.into_iter().map(Arc::new).collect();

account_minter
.create_and_fund_accounts(
txn_executor,
account_generator,
seed_accounts.clone(),
accounts.clone(),
coins_per_account,
config.max_submit_batch_size,
Expand Down
25 changes: 16 additions & 9 deletions crates/transaction-emitter-lib/src/emitter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,12 @@ pub const EXPECTED_GAS_PER_ACCOUNT_CREATE: u64 = 2000 + 8;

const MAX_RETRIES: usize = 12;

// This retry policy is used for important client calls necessary for setting
// up the test (e.g. account creation) and collecting its results (e.g. checking
// account sequence numbers). If these fail, the whole test fails. We do not use
// this for submitting transactions, as we have a way to handle when that fails.
// This retry policy means an operation will take 8 seconds at most.
pub static RETRY_POLICY: Lazy<RetryPolicy> = Lazy::new(|| {
RetryPolicy::exponential(Duration::from_millis(125))
// This retry policy is used for querying sequence numbers and account balances in the initialization step.
// If these fail, the whole test fails. Backoff is large, as generally only other side
// throttling our requests is the cause for failures.
// We do not use this for submitting transactions, as we have a way to handle when that fails.
static FETCH_ACCOUNT_RETRY_POLICY: Lazy<RetryPolicy> = Lazy::new(|| {
RetryPolicy::exponential(Duration::from_secs(1))
.with_max_retries(MAX_RETRIES)
.with_jitter(true)
});
Expand Down Expand Up @@ -181,6 +180,8 @@ pub struct EmitJobRequest {
coordination_delay_between_instances: Duration,

latency_polling_interval: Duration,
// Default additional wait is (txn_expiration_time_secs + 5). Override to wait for different length.
tps_wait_after_expiration_secs: Option<u64>,

account_minter_seed: Option<[u8; 32]>,
}
Expand Down Expand Up @@ -211,6 +212,7 @@ impl Default for EmitJobRequest {
prompt_before_spending: false,
coordination_delay_between_instances: Duration::from_secs(0),
latency_polling_interval: Duration::from_millis(300),
tps_wait_after_expiration_secs: None,
account_minter_seed: None,
coins_per_account_override: None,
}
Expand Down Expand Up @@ -443,7 +445,12 @@ impl EmitJobRequest {
// That's why we set wait_seconds conservativelly, to make sure all processing and
// client calls finish within that time.

let wait_seconds = self.txn_expiration_time_secs + 180;
let wait_seconds =
if let Some(wait_after_expiration) = self.tps_wait_after_expiration_secs {
self.txn_expiration_time_secs + wait_after_expiration
} else {
self.txn_expiration_time_secs * 2 + 5
};
// In case we set a very low TPS, we need to still be able to spread out
// transactions, at least to the seconds granularity, so we reduce transactions_per_account
// if needed.
Expand Down Expand Up @@ -1011,7 +1018,7 @@ where
I: Iterator<Item = &'a AccountAddress>,
{
let futures = addresses.map(|address| {
RETRY_POLICY.retry(move || get_account_address_and_seq_num(client, *address))
FETCH_ACCOUNT_RETRY_POLICY.retry(move || get_account_address_and_seq_num(client, *address))
});

let (seq_nums, timestamps): (Vec<_>, Vec<_>) = try_join_all(futures)
Expand Down
14 changes: 7 additions & 7 deletions crates/transaction-emitter-lib/src/emitter/submission_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
},
EmitModeParams,
};
use aptos_logger::{debug, info, sample, sample::SampleRate, warn};
use aptos_logger::{debug, error, info, sample, sample::SampleRate, warn};
use aptos_rest_client::Client as RestClient;
use aptos_sdk::{
move_types::account_address::AccountAddress,
Expand Down Expand Up @@ -97,9 +97,9 @@ impl SubmissionWorker {
&& loop_start_time.duration_since(wait_until) > Duration::from_secs(5)
{
sample!(
SampleRate::Duration(Duration::from_secs(120)),
warn!(
"[{:?}] txn_emitter worker drifted out of sync too much: {}s",
SampleRate::Duration(Duration::from_secs(5)),
error!(
"[{:?}] txn_emitter worker drifted out of sync too much: {}s. Is expiration too short, or 5s buffer on top of it?",
self.client().path_prefix_string(),
loop_start_time.duration_since(wait_until).as_secs()
)
Expand Down Expand Up @@ -163,7 +163,7 @@ impl SubmissionWorker {
let submitted_after = loop_start_time.elapsed();
if submitted_after.as_secs() > 5 {
sample!(
SampleRate::Duration(Duration::from_secs(120)),
SampleRate::Duration(Duration::from_secs(30)),
warn!(
"[{:?}] txn_emitter worker waited for more than 5s to submit transactions: {}s after loop start",
self.client().path_prefix_string(),
Expand All @@ -176,7 +176,7 @@ impl SubmissionWorker {
// we also don't want to be stuck waiting for txn_expiration_time_secs
// after stop is called, so we sleep until time or stop is set.
self.sleep_check_done(Duration::from_secs(
self.params.txn_expiration_time_secs + 20,
self.params.txn_expiration_time_secs + 3,
))
.await
}
Expand Down Expand Up @@ -271,7 +271,7 @@ impl SubmissionWorker {
.expired
.fetch_add(num_expired as u64, Ordering::Relaxed);
sample!(
SampleRate::Duration(Duration::from_secs(120)),
SampleRate::Duration(Duration::from_secs(60)),
warn!(
"[{:?}] Transactions were not committed before expiration: {:?}, for {:?}",
self.client().path_prefix_string(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::RETRY_POLICY;
use super::FETCH_ACCOUNT_RETRY_POLICY;
use anyhow::{Context, Result};
use aptos_logger::{debug, info, sample, sample::SampleRate, warn};
use aptos_rest_client::{aptos_api_types::AptosErrorCode, error::RestError, Client as RestClient};
Expand Down Expand Up @@ -269,7 +269,7 @@ pub async fn query_sequence_number_with_client(
rest_client: &RestClient,
account_address: AccountAddress,
) -> Result<u64> {
let result = RETRY_POLICY
let result = FETCH_ACCOUNT_RETRY_POLICY
.retry_if(
move || rest_client.get_account_bcs(account_address),
|error: &RestError| !is_account_not_found(error),
Expand All @@ -294,7 +294,7 @@ fn is_account_not_found(error: &RestError) -> bool {
#[async_trait]
impl ReliableTransactionSubmitter for RestApiReliableTransactionSubmitter {
async fn get_account_balance(&self, account_address: AccountAddress) -> Result<u64> {
Ok(RETRY_POLICY
Ok(FETCH_ACCOUNT_RETRY_POLICY
.retry(move || {
self.random_rest_client()
.get_account_balance(account_address)
Expand Down

0 comments on commit f187562

Please sign in to comment.