diff --git a/crates/transaction-generator-lib/src/workflow_delegator.rs b/crates/transaction-generator-lib/src/workflow_delegator.rs index 439b68e056780..2252fbb4aab60 100644 --- a/crates/transaction-generator-lib/src/workflow_delegator.rs +++ b/crates/transaction-generator-lib/src/workflow_delegator.rs @@ -14,7 +14,7 @@ use aptos_sdk::{ types::{transaction::SignedTransaction, LocalAccount}, }; use std::{ - cmp, + fmt::Debug, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, @@ -24,9 +24,10 @@ use std::{ #[derive(Clone)] enum StageTracking { - // stage is externally modified + // Stage is externally modified. This is used by executor benchmark tests ExternallySet(Arc), - // we move to a next stage when all accounts have finished with the current stage + // We move to a next stage when all accounts have finished with the current stage + // This is used by transaction emitter (forge and tests on mainnet, devnet, testnet) WhenDone { stage_counter: Arc, stage_start_time: Arc, @@ -70,7 +71,7 @@ impl StageTracking { /// /// pool_i is filled by gen_i, and consumed by gen_i+1, and so there is one less pools than generators. /// -/// We start with stage 0, which calls gen_0 pool_per_stage times, which populates pool_0 with accounts. +/// We start with stage 0, which calls gen_0 stage_switch_conditions[0].len() times, which populates pool_0 with accounts. /// /// After that, in stage 1, we call gen_1, which consumes accounts from pool_0, and moves them to pool_1. /// We do this until pool_0 is empty. @@ -86,26 +87,19 @@ impl StageTracking { struct WorkflowTxnGenerator { stage: StageTracking, generators: Vec>, - pool_per_stage: Vec>>, - num_for_first_stage: usize, - // Internal counter, so multiple workers (WorkflowTxnGenerator) can coordinate how many times to execute the first stage - completed_for_first_stage: Arc, + stage_switch_conditions: Vec, } impl WorkflowTxnGenerator { fn new( stage: StageTracking, generators: Vec>, - pool_per_stage: Vec>>, - num_for_first_stage: usize, - completed_for_first_stage: Arc, + stage_switch_conditions: Vec, ) -> Self { Self { stage, generators, - pool_per_stage, - num_for_first_stage, - completed_for_first_stage, + stage_switch_conditions, } } } @@ -114,7 +108,7 @@ impl TransactionGenerator for WorkflowTxnGenerator { fn generate_transactions( &mut self, account: &LocalAccount, - mut num_to_create: usize, + num_to_create: usize, ) -> Vec { assert_ne!(num_to_create, 0); let stage = match self.stage.load_current_stage() { @@ -128,42 +122,18 @@ impl TransactionGenerator for WorkflowTxnGenerator { }, }; - if stage == 0 { - // We can treat completed_for_first_stage as a stream of indices [0, +inf), - // where we want to execute only first num_for_first_stage (i.e. [0, num_for_first_stage) ) - // So here we grab num_to_create "indices" from completed_for_first_stage counter, - // and then skip those that are in [num_for_first_stage, +inf) range. - let prev = self - .completed_for_first_stage - .fetch_add(num_to_create, Ordering::Relaxed); - num_to_create = cmp::min(num_to_create, self.num_for_first_stage.saturating_sub(prev)); - } - // if stage is not 0, then grabing from the pool itself, inside of the generator.generate_transactions - // acts as coordinator, as it will generate as many transactions as number of accounts it could grab from the pool. - match &self.stage { StageTracking::WhenDone { stage_counter, stage_start_time, delay_between_stages, } => { - if stage == 0 { - if num_to_create == 0 { - info!("TransactionGenerator Workflow: Stage 0 is full with {} accounts, moving to stage 1", self.pool_per_stage.first().unwrap().len()); - stage_start_time.store( - StageTracking::current_timestamp() + delay_between_stages.as_secs(), - Ordering::Relaxed, - ); - let _ = stage_counter.compare_exchange( - 0, - 1, - Ordering::Relaxed, - Ordering::Relaxed, - ); - return Vec::new(); - } - } else if stage < self.pool_per_stage.len() - && self.pool_per_stage.get(stage - 1).unwrap().len() == 0 + if stage < self.stage_switch_conditions.len() + && self + .stage_switch_conditions + .get(stage) + .unwrap() + .should_switch() { info!("TransactionGenerator Workflow: Stage {} has consumed all accounts, moving to stage {}", stage, stage + 1); stage_start_time.store( @@ -180,7 +150,15 @@ impl TransactionGenerator for WorkflowTxnGenerator { } }, StageTracking::ExternallySet(_) => { - if stage == 0 && num_to_create == 0 { + if stage >= self.stage_switch_conditions.len() + || (stage < self.stage_switch_conditions.len() + && self + .stage_switch_conditions + .get(stage) + .unwrap() + .should_switch()) + { + info!("TransactionGenerator Workflow: Stage {} has consumed all accounts, moving to stage {}", stage, stage + 1); return Vec::new(); } }, @@ -188,7 +166,7 @@ impl TransactionGenerator for WorkflowTxnGenerator { sample!( SampleRate::Duration(Duration::from_secs(2)), - info!("Cur stage: {}, pool sizes: {:?}", stage, self.pool_per_stage.iter().map(|p| p.len()).collect::>()); + info!("Cur stage: {}, stage switch conditions: {:?}", stage, self.stage_switch_conditions); ); let result = if let Some(generator) = self.generators.get_mut(stage) { @@ -196,32 +174,70 @@ impl TransactionGenerator for WorkflowTxnGenerator { } else { Vec::new() }; - + if let Some(switch_condition) = self.stage_switch_conditions.get_mut(stage) { + switch_condition.reduce_txn_count(result.len()); + } result } } +#[derive(Clone)] +enum StageSwitchCondition { + WhenPoolBecomesEmpty(Arc>), + MaxTransactions(Arc), +} + +impl StageSwitchCondition { + fn should_switch(&self) -> bool { + match self { + StageSwitchCondition::WhenPoolBecomesEmpty(pool) => pool.len() == 0, + StageSwitchCondition::MaxTransactions(max) => max.load(Ordering::Relaxed) == 0, + } + } + + fn reduce_txn_count(&mut self, count: usize) { + match self { + StageSwitchCondition::WhenPoolBecomesEmpty(_) => {}, + StageSwitchCondition::MaxTransactions(max) => { + let current = max.load(Ordering::Relaxed); + if count > current { + max.store(0, Ordering::Relaxed); + } else { + max.fetch_sub(count, Ordering::Relaxed); + } + }, + } + } +} +impl Debug for StageSwitchCondition { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + StageSwitchCondition::WhenPoolBecomesEmpty(pool) => { + write!(f, "WhenPoolBecomesEmpty({})", pool.len()) + }, + StageSwitchCondition::MaxTransactions(max) => { + write!(f, "MaxTransactions({})", max.load(Ordering::Relaxed)) + }, + } + } +} + pub struct WorkflowTxnGeneratorCreator { stage: StageTracking, creators: Vec>, - pool_per_stage: Vec>>, - num_for_first_stage: usize, - completed_for_first_stage: Arc, + stage_switch_conditions: Vec, } impl WorkflowTxnGeneratorCreator { fn new( stage: StageTracking, creators: Vec>, - pool_per_stage: Vec>>, - num_for_first_stage: usize, + stage_switch_conditions: Vec, ) -> Self { Self { stage, creators, - pool_per_stage, - num_for_first_stage, - completed_for_first_stage: Arc::new(AtomicUsize::new(0)), + stage_switch_conditions, } } @@ -273,7 +289,7 @@ impl WorkflowTxnGeneratorCreator { txn_executor, num_modules, mint_entry_point.package_name(), - Some(20_00000000), + Some(40_0000_0000), ) .await; @@ -327,12 +343,11 @@ impl WorkflowTxnGeneratorCreator { Some(burnt_pool.clone()), )), ]; - Self::new( - stage_tracking, - creators, - vec![created_pool, minted_pool, burnt_pool], - count, - ) + Self::new(stage_tracking, creators, vec![ + StageSwitchCondition::MaxTransactions(Arc::new(AtomicUsize::new(count))), + StageSwitchCondition::WhenPoolBecomesEmpty(created_pool), + StageSwitchCondition::WhenPoolBecomesEmpty(minted_pool), + ]) }, } } @@ -346,9 +361,7 @@ impl TransactionGeneratorCreator for WorkflowTxnGeneratorCreator { .iter() .map(|c| c.create_transaction_generator()) .collect(), - self.pool_per_stage.clone(), - self.num_for_first_stage, - self.completed_for_first_stage.clone(), + self.stage_switch_conditions.clone(), )) } }