Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compat expansion; forge refactor #13302

Merged
merged 35 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
ff39282
batch_update_gradually
brianolson May 8, 2024
cd60397
big refactor towards multi-threading
brianolson May 16, 2024
633dca2
upgrade_and_gather_stats() on all phases
brianolson May 16, 2024
e5d5d5e
lock reference fix
brianolson May 17, 2024
0fafd8e
forge re-re-refactor
brianolson May 31, 2024
5980849
try not to create a tokio runtime within a tokio runtime
brianolson Jun 3, 2024
835735b
async fn generate_traffic()
brianolson Jun 4, 2024
0e58a24
Merge remote-tracking branch 'origin/main' into compat2-big-refactor
brianolson Jun 4, 2024
dded354
tweak scope-drop stuff
brianolson Jun 4, 2024
323d334
another try at not recursively breaking tokio runtimes
brianolson Jun 4, 2024
8b1ceda
fmt
brianolson Jun 4, 2024
8d05c3d
async_trait NetworkTest.run
brianolson Jun 6, 2024
ef91fbf
NetworkLoadTest -> async_trait
brianolson Jun 7, 2024
d4d7a91
one less block_on
brianolson Jun 7, 2024
2876d26
less block_on
brianolson Jun 7, 2024
2fd0b98
async contagion spreads
brianolson Jun 7, 2024
38ca3ea
Merge branch 'main' into compat2-big-refactor
brianolson Jun 11, 2024
ce7b718
fix
brianolson Jun 11, 2024
1eb9a05
hack compat duration
brianolson Jun 11, 2024
8a18782
logging
brianolson Jun 11, 2024
36ba070
making parts of Swarm and Node traits &self from &mut self
brianolson Jun 12, 2024
27486a5
swarm loses a bunch of &mut things as mutability goes hidden by inter…
brianolson Jun 12, 2024
9049438
&mut swarm -> Arc<tokio::sync::RwLock<Box<dyn Swarm>>>
brianolson Jun 13, 2024
2a217e5
more async spread
brianolson Jun 17, 2024
1319320
Merge remote-tracking branch 'origin/main' into compat2-big-refactor
brianolson Jun 17, 2024
45c3670
fix
brianolson Jun 17, 2024
b5b119e
cargo lint
brianolson Jun 17, 2024
9d1e7f1
Merge remote-tracking branch 'origin/main' into compat2-big-refactor
brianolson Jun 17, 2024
5f80094
clenup update_seq_num_and_get_num_expired() which got refactored away
brianolson Jun 18, 2024
e6eb7b9
Merge branch 'main' into compat2-big-refactor
brianolson Jun 18, 2024
fc46210
PR cleanup
brianolson Jun 24, 2024
95e73dd
Merge remote-tracking branch 'origin/main' into compat2-big-refactor
brianolson Jun 24, 2024
a6bc708
cleanup
brianolson Jun 24, 2024
ebb0a48
Merge remote-tracking branch 'origin/main' into compat2-big-refactor
brianolson Jun 27, 2024
e138cfb
fix
brianolson Jun 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 33 additions & 25 deletions crates/transaction-emitter-lib/src/emitter/account_minter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use aptos_sdk::{
use aptos_transaction_generator_lib::{
CounterState, ReliableTransactionSubmitter, RootAccountHandle, SEND_AMOUNT,
};
use aptos_types::account_address::AccountAddress;
use core::{
cmp::min,
result::Result::{Err, Ok},
Expand All @@ -31,7 +32,7 @@ use std::{
};

pub struct SourceAccountManager<'t> {
pub source_account: &'t LocalAccount,
pub source_account: Arc<LocalAccount>,
pub txn_executor: &'t dyn ReliableTransactionSubmitter,
pub req: &'t EmitJobRequest,
pub txn_factory: TransactionFactory,
Expand All @@ -43,17 +44,21 @@ impl<'t> RootAccountHandle for SourceAccountManager<'t> {
self.check_approve_funds(amount, reason).await.unwrap();
}

fn get_root_account(&self) -> &LocalAccount {
self.source_account
fn get_root_account(&self) -> Arc<LocalAccount> {
self.source_account.clone()
}
}

impl<'t> SourceAccountManager<'t> {
fn source_account_address(&self) -> AccountAddress {
self.source_account.address()
}

// returns true if we might want to recheck the volume, as it was auto-approved.
async fn check_approve_funds(&self, amount: u64, reason: &str) -> Result<bool> {
let balance = self
.txn_executor
.get_account_balance(self.source_account.address())
.get_account_balance(self.source_account_address())
.await?;
Ok(if self.req.mint_to_root {
// We have a root account, so amount of funds minted is not a problem
Expand All @@ -63,7 +68,7 @@ impl<'t> SourceAccountManager<'t> {
if balance < amount.checked_mul(100).unwrap_or(u64::MAX / 2) {
info!(
"Mint account {} current balance is {}, needing {} for {}, minting to refil it fully",
self.source_account.address(),
self.source_account_address(),
balance,
amount,
reason,
Expand All @@ -74,7 +79,7 @@ impl<'t> SourceAccountManager<'t> {
} else {
info!(
"Mint account {} current balance is {}, needing {} for {}. Proceeding without minting, as balance would overflow otherwise",
self.source_account.address(),
self.source_account_address(),
balance,
amount,
reason,
Expand All @@ -85,7 +90,7 @@ impl<'t> SourceAccountManager<'t> {
} else {
info!(
"Source account {} current balance is {}, needed {} coins for {}, or {:.3}% of its balance",
self.source_account.address(),
self.source_account_address(),
balance,
amount,
reason,
Expand All @@ -95,7 +100,7 @@ impl<'t> SourceAccountManager<'t> {
if balance < amount {
return Err(anyhow!(
"Source ({}) doesn't have enough coins, balance {} < needed {} for {}",
self.source_account.address(),
self.source_account_address(),
balance,
amount,
reason
Expand Down Expand Up @@ -128,15 +133,15 @@ impl<'t> SourceAccountManager<'t> {
let txn = self
.source_account
.sign_with_transaction_builder(self.txn_factory.payload(
aptos_stdlib::aptos_coin_mint(self.source_account.address(), amount),
aptos_stdlib::aptos_coin_mint(self.source_account_address(), amount),
));

if let Err(e) = txn_executor.execute_transactions(&[txn]).await {
// This cannot work simultaneously across different txn emitters,
// so check on failure if another emitter has refilled it instead

let balance = txn_executor
.get_account_balance(self.source_account.address())
.get_account_balance(self.source_account_address())
.await?;
if balance > u64::MAX / 2 {
Ok(())
Expand Down Expand Up @@ -393,7 +398,7 @@ impl<'t> AccountMinter<'t> {

pub async fn create_and_fund_seed_accounts(
&mut self,
mut new_source_account: Option<LocalAccount>,
new_source_account: Option<LocalAccount>,
txn_executor: &dyn ReliableTransactionSubmitter,
account_generator: Box<dyn LocalAccountGenerator>,
seed_account_num: usize,
Expand All @@ -407,6 +412,10 @@ impl<'t> AccountMinter<'t> {
);
let mut i = 0;
let mut seed_accounts = vec![];
let source_account = match new_source_account {
None => self.source_account.get_root_account().clone(),
Some(param_account) => Arc::new(param_account),
};
while i < seed_account_num {
let batch_size = min(max_submit_batch_size, seed_account_num - i);
let mut rng = StdRng::from_rng(self.rng()).unwrap();
Expand All @@ -418,11 +427,7 @@ impl<'t> AccountMinter<'t> {
.iter()
.map(|account| {
create_and_fund_account_request(
if let Some(account) = &mut new_source_account {
account
} else {
self.source_account.get_root_account()
},
source_account.clone(),
coins_per_seed_account,
account.public_key(),
txn_factory,
Expand Down Expand Up @@ -470,16 +475,17 @@ impl<'t> AccountMinter<'t> {
coins_for_source: u64,
) -> Result<LocalAccount> {
const NUM_TRIES: usize = 3;
let root_account = self.source_account.get_root_account();
let root_address = root_account.address();
for i in 0..NUM_TRIES {
self.source_account.get_root_account().set_sequence_number(
txn_executor
.query_sequence_number(self.source_account.get_root_account().address())
.await?,
);
{
let new_sequence_number = txn_executor.query_sequence_number(root_address).await?;
root_account.set_sequence_number(new_sequence_number);
}

let new_source_account = LocalAccount::generate(self.rng());
let txn = create_and_fund_account_request(
self.source_account.get_root_account(),
root_account.clone(),
coins_for_source,
new_source_account.public_key(),
&self.txn_factory,
Expand Down Expand Up @@ -530,12 +536,14 @@ async fn create_and_fund_new_accounts(
.chunks(max_num_accounts_per_batch)
.map(|chunk| chunk.to_vec())
.collect::<Vec<_>>();
let source_address = source_account.address();
let source_account = Arc::new(source_account);
for batch in accounts_by_batch {
let creation_requests: Vec<_> = batch
.iter()
.map(|account| {
create_and_fund_account_request(
&source_account,
source_account.clone(),
coins_per_new_account,
account.public_key(),
txn_factory,
Expand All @@ -546,13 +554,13 @@ async fn create_and_fund_new_accounts(
txn_executor
.execute_transactions_with_counter(&creation_requests, counters)
.await
.with_context(|| format!("Account {} couldn't mint", source_account.address()))?;
.with_context(|| format!("Account {} couldn't mint", source_address))?;
}
Ok(())
}

pub fn create_and_fund_account_request(
creation_account: &LocalAccount,
creation_account: Arc<LocalAccount>,
amount: u64,
pubkey: &Ed25519PublicKey,
txn_factory: &TransactionFactory,
Expand Down
90 changes: 9 additions & 81 deletions crates/transaction-emitter-lib/src/emitter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::emitter::{
use again::RetryPolicy;
use anyhow::{ensure, format_err, Result};
use aptos_config::config::DEFAULT_MAX_SUBMIT_TRANSACTION_BATCH_SIZE;
use aptos_logger::{debug, error, info, sample, sample::SampleRate, warn};
use aptos_logger::{error, info, sample, sample::SampleRate, warn};
use aptos_rest_client::{aptos_api_types::AptosErrorCode, error::RestError, Client as RestClient};
use aptos_sdk::{
move_types::account_address::AccountAddress,
Expand Down Expand Up @@ -649,7 +649,7 @@ impl EmitJob {
}
}

#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct TxnEmitter {
txn_factory: TransactionFactory,
rng: StdRng,
Expand All @@ -673,7 +673,7 @@ impl TxnEmitter {

pub async fn start_job(
&mut self,
root_account: &LocalAccount,
root_account: Arc<LocalAccount>,
req: EmitJobRequest,
stats_tracking_phases: usize,
) -> Result<EmitJob> {
Expand Down Expand Up @@ -708,7 +708,7 @@ impl TxnEmitter {
let account_generator = create_account_generator(req.account_type);

let mut all_accounts = create_accounts(
root_account,
root_account.clone(),
&init_txn_factory,
account_generator,
&req,
Expand All @@ -730,7 +730,7 @@ impl TxnEmitter {
retry_after: req.init_retry_interval,
};
let source_account_manager = SourceAccountManager {
source_account: root_account,
source_account: root_account.clone(),
txn_executor: &txn_executor,
req: &req,
txn_factory: init_txn_factory.clone(),
Expand Down Expand Up @@ -819,7 +819,7 @@ impl TxnEmitter {

async fn emit_txn_for_impl(
mut self,
source_account: &LocalAccount,
source_account: Arc<LocalAccount>,
emit_job_request: EmitJobRequest,
duration: Duration,
print_stats_interval: Option<u64>,
Expand Down Expand Up @@ -855,7 +855,7 @@ impl TxnEmitter {

pub async fn emit_txn_for(
self,
source_account: &mut LocalAccount,
source_account: Arc<LocalAccount>,
emit_job_request: EmitJobRequest,
duration: Duration,
) -> Result<TxnStats> {
Expand All @@ -865,7 +865,7 @@ impl TxnEmitter {

pub async fn emit_txn_for_with_stats(
self,
source_account: &LocalAccount,
source_account: Arc<LocalAccount>,
emit_job_request: EmitJobRequest,
duration: Duration,
interval_secs: u64,
Expand Down Expand Up @@ -982,78 +982,6 @@ async fn wait_for_accounts_sequence(
(latest_fetched_counts, sum_of_completion_timestamps_millis)
}

fn update_seq_num_and_get_num_expired(
accounts: &mut [LocalAccount],
account_to_start_and_end_seq_num: HashMap<AccountAddress, (u64, u64)>,
latest_fetched_counts: HashMap<AccountAddress, u64>,
) -> (usize, usize) {
accounts.iter_mut().for_each(|account| {
let (start_seq_num, end_seq_num) =
if let Some(pair) = account_to_start_and_end_seq_num.get(&account.address()) {
pair
} else {
return;
};
assert!(account.sequence_number() == *end_seq_num);

match latest_fetched_counts.get(&account.address()) {
Some(count) => {
if *count != account.sequence_number() {
assert!(account.sequence_number() > *count);
debug!(
"Stale sequence_number for {}, expected {}, setting to {}",
account.address(),
account.sequence_number(),
count
);
account.set_sequence_number(*count);
}
},
None => {
debug!(
"Couldn't fetch sequence_number for {}, expected {}, setting to {}",
account.address(),
account.sequence_number(),
start_seq_num
);
account.set_sequence_number(*start_seq_num);
},
}
});

account_to_start_and_end_seq_num
.iter()
.map(
|(address, (start_seq_num, end_seq_num))| match latest_fetched_counts.get(address) {
Some(count) => {
assert!(
*count <= *end_seq_num,
"{address} :: {count} > {end_seq_num}"
);
if *count >= *start_seq_num {
(
(*count - *start_seq_num) as usize,
(*end_seq_num - *count) as usize,
)
} else {
debug!(
"Stale sequence_number fetched for {}, start_seq_num {}, fetched {}",
address, start_seq_num, *count
);
(0, (*end_seq_num - *start_seq_num) as usize)
}
},
None => (0, (end_seq_num - start_seq_num) as usize),
},
)
.fold(
(0, 0),
|(committed, expired), (cur_committed, cur_expired)| {
(committed + cur_committed, expired + cur_expired)
},
)
}

pub async fn query_sequence_number(client: &RestClient, address: AccountAddress) -> Result<u64> {
Ok(query_sequence_numbers(client, [address].iter()).await?.0[0].1)
}
Expand Down Expand Up @@ -1133,7 +1061,7 @@ pub fn parse_seed(seed_string: &str) -> [u8; 32] {
}

pub async fn create_accounts(
root_account: &LocalAccount,
root_account: Arc<LocalAccount>,
txn_factory: &TransactionFactory,
account_generator: Box<dyn LocalAccountGenerator>,
req: &EmitJobRequest,
Expand Down
Loading
Loading