From 1a7ab6fe653a05ec3555d271e501d20c43ec361f Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Mon, 4 Nov 2024 11:19:55 -0800 Subject: [PATCH 1/3] Refactor workflow delegator to use stop stage condition --- .../src/workflow_delegator.rs | 147 ++++++++++-------- 1 file changed, 80 insertions(+), 67 deletions(-) diff --git a/crates/transaction-generator-lib/src/workflow_delegator.rs b/crates/transaction-generator-lib/src/workflow_delegator.rs index 439b68e056780..5af82400b4962 100644 --- a/crates/transaction-generator-lib/src/workflow_delegator.rs +++ b/crates/transaction-generator-lib/src/workflow_delegator.rs @@ -14,7 +14,7 @@ use aptos_sdk::{ types::{transaction::SignedTransaction, LocalAccount}, }; use std::{ - cmp, + fmt::Debug, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, @@ -24,9 +24,10 @@ use std::{ #[derive(Clone)] enum StageTracking { - // stage is externally modified + // Stage is externally modified. This is used by executor benchmark tests ExternallySet(Arc), - // we move to a next stage when all accounts have finished with the current stage + // We move to a next stage when all accounts have finished with the current stage + // This is used by transaction emitter (forge and tests on mainnet, devnet, testnet) WhenDone { stage_counter: Arc, stage_start_time: Arc, @@ -70,7 +71,7 @@ impl StageTracking { /// /// pool_i is filled by gen_i, and consumed by gen_i+1, and so there is one less pools than generators. /// -/// We start with stage 0, which calls gen_0 pool_per_stage times, which populates pool_0 with accounts. +/// We start with stage 0, which calls gen_0 stop_condition_per_stage[0].len() times, which populates pool_0 with accounts. /// /// After that, in stage 1, we call gen_1, which consumes accounts from pool_0, and moves them to pool_1. /// We do this until pool_0 is empty. @@ -86,26 +87,19 @@ impl StageTracking { struct WorkflowTxnGenerator { stage: StageTracking, generators: Vec>, - pool_per_stage: Vec>>, - num_for_first_stage: usize, - // Internal counter, so multiple workers (WorkflowTxnGenerator) can coordinate how many times to execute the first stage - completed_for_first_stage: Arc, + stop_condition_per_stage: Vec, } impl WorkflowTxnGenerator { fn new( stage: StageTracking, generators: Vec>, - pool_per_stage: Vec>>, - num_for_first_stage: usize, - completed_for_first_stage: Arc, + stop_condition_per_stage: Vec, ) -> Self { Self { stage, generators, - pool_per_stage, - num_for_first_stage, - completed_for_first_stage, + stop_condition_per_stage, } } } @@ -114,7 +108,7 @@ impl TransactionGenerator for WorkflowTxnGenerator { fn generate_transactions( &mut self, account: &LocalAccount, - mut num_to_create: usize, + num_to_create: usize, ) -> Vec { assert_ne!(num_to_create, 0); let stage = match self.stage.load_current_stage() { @@ -128,42 +122,18 @@ impl TransactionGenerator for WorkflowTxnGenerator { }, }; - if stage == 0 { - // We can treat completed_for_first_stage as a stream of indices [0, +inf), - // where we want to execute only first num_for_first_stage (i.e. [0, num_for_first_stage) ) - // So here we grab num_to_create "indices" from completed_for_first_stage counter, - // and then skip those that are in [num_for_first_stage, +inf) range. - let prev = self - .completed_for_first_stage - .fetch_add(num_to_create, Ordering::Relaxed); - num_to_create = cmp::min(num_to_create, self.num_for_first_stage.saturating_sub(prev)); - } - // if stage is not 0, then grabing from the pool itself, inside of the generator.generate_transactions - // acts as coordinator, as it will generate as many transactions as number of accounts it could grab from the pool. - match &self.stage { StageTracking::WhenDone { stage_counter, stage_start_time, delay_between_stages, } => { - if stage == 0 { - if num_to_create == 0 { - info!("TransactionGenerator Workflow: Stage 0 is full with {} accounts, moving to stage 1", self.pool_per_stage.first().unwrap().len()); - stage_start_time.store( - StageTracking::current_timestamp() + delay_between_stages.as_secs(), - Ordering::Relaxed, - ); - let _ = stage_counter.compare_exchange( - 0, - 1, - Ordering::Relaxed, - Ordering::Relaxed, - ); - return Vec::new(); - } - } else if stage < self.pool_per_stage.len() - && self.pool_per_stage.get(stage - 1).unwrap().len() == 0 + if stage < self.stop_condition_per_stage.len() + && self + .stop_condition_per_stage + .get(stage) + .unwrap() + .should_stop() { info!("TransactionGenerator Workflow: Stage {} has consumed all accounts, moving to stage {}", stage, stage + 1); stage_start_time.store( @@ -180,15 +150,23 @@ impl TransactionGenerator for WorkflowTxnGenerator { } }, StageTracking::ExternallySet(_) => { - if stage == 0 && num_to_create == 0 { + if stage >= self.stop_condition_per_stage.len() + || (stage < self.stop_condition_per_stage.len() + && self + .stop_condition_per_stage + .get(stage) + .unwrap() + .should_stop()) + { + info!("TransactionGenerator Workflow: Stage {} has consumed all accounts, moving to stage {}", stage, stage + 1); return Vec::new(); } }, } sample!( - SampleRate::Duration(Duration::from_secs(2)), - info!("Cur stage: {}, pool sizes: {:?}", stage, self.pool_per_stage.iter().map(|p| p.len()).collect::>()); + SampleRate::Duration(Duration::from_millis(250)), + info!("Cur stage: {}, stop conditions per stage: {:?}", stage, self.stop_condition_per_stage); ); let result = if let Some(generator) = self.generators.get_mut(stage) { @@ -196,32 +174,70 @@ impl TransactionGenerator for WorkflowTxnGenerator { } else { Vec::new() }; - + if let Some(stop_condition) = self.stop_condition_per_stage.get_mut(stage) { + stop_condition.reduce_txn_count(result.len()); + } result } } +#[derive(Clone)] +enum StageStopCondition { + WhenPoolBecomesEmpty(Arc>), + MaxTransactions(Arc), +} + +impl StageStopCondition { + fn should_stop(&self) -> bool { + match self { + StageStopCondition::WhenPoolBecomesEmpty(pool) => pool.len() == 0, + StageStopCondition::MaxTransactions(max) => max.load(Ordering::Relaxed) == 0, + } + } + + fn reduce_txn_count(&mut self, count: usize) { + match self { + StageStopCondition::WhenPoolBecomesEmpty(_) => {}, + StageStopCondition::MaxTransactions(max) => { + let current = max.load(Ordering::Relaxed); + if count > current { + max.store(0, Ordering::Relaxed); + } else { + max.fetch_sub(count, Ordering::Relaxed); + } + }, + } + } +} +impl Debug for StageStopCondition { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + StageStopCondition::WhenPoolBecomesEmpty(pool) => { + write!(f, "WhenPoolBecomesEmpty({})", pool.len()) + }, + StageStopCondition::MaxTransactions(max) => { + write!(f, "MaxTransactions({})", max.load(Ordering::Relaxed)) + }, + } + } +} + pub struct WorkflowTxnGeneratorCreator { stage: StageTracking, creators: Vec>, - pool_per_stage: Vec>>, - num_for_first_stage: usize, - completed_for_first_stage: Arc, + stop_condition_per_stage: Vec, } impl WorkflowTxnGeneratorCreator { fn new( stage: StageTracking, creators: Vec>, - pool_per_stage: Vec>>, - num_for_first_stage: usize, + stop_condition_per_stage: Vec, ) -> Self { Self { stage, creators, - pool_per_stage, - num_for_first_stage, - completed_for_first_stage: Arc::new(AtomicUsize::new(0)), + stop_condition_per_stage, } } @@ -273,7 +289,7 @@ impl WorkflowTxnGeneratorCreator { txn_executor, num_modules, mint_entry_point.package_name(), - Some(20_00000000), + Some(40_0000_0000), ) .await; @@ -327,12 +343,11 @@ impl WorkflowTxnGeneratorCreator { Some(burnt_pool.clone()), )), ]; - Self::new( - stage_tracking, - creators, - vec![created_pool, minted_pool, burnt_pool], - count, - ) + Self::new(stage_tracking, creators, vec![ + StageStopCondition::MaxTransactions(Arc::new(AtomicUsize::new(count))), + StageStopCondition::WhenPoolBecomesEmpty(created_pool), + StageStopCondition::WhenPoolBecomesEmpty(minted_pool), + ]) }, } } @@ -346,9 +361,7 @@ impl TransactionGeneratorCreator for WorkflowTxnGeneratorCreator { .iter() .map(|c| c.create_transaction_generator()) .collect(), - self.pool_per_stage.clone(), - self.num_for_first_stage, - self.completed_for_first_stage.clone(), + self.stop_condition_per_stage.clone(), )) } } From e200586808cfd2df821d2960bd3ce4244811e9d6 Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Mon, 4 Nov 2024 11:26:11 -0800 Subject: [PATCH 2/3] Renaming --- .../src/workflow_delegator.rs | 62 +++++++++---------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/crates/transaction-generator-lib/src/workflow_delegator.rs b/crates/transaction-generator-lib/src/workflow_delegator.rs index 5af82400b4962..12f067c3c8552 100644 --- a/crates/transaction-generator-lib/src/workflow_delegator.rs +++ b/crates/transaction-generator-lib/src/workflow_delegator.rs @@ -71,7 +71,7 @@ impl StageTracking { /// /// pool_i is filled by gen_i, and consumed by gen_i+1, and so there is one less pools than generators. /// -/// We start with stage 0, which calls gen_0 stop_condition_per_stage[0].len() times, which populates pool_0 with accounts. +/// We start with stage 0, which calls gen_0 stage_switch_conditions[0].len() times, which populates pool_0 with accounts. /// /// After that, in stage 1, we call gen_1, which consumes accounts from pool_0, and moves them to pool_1. /// We do this until pool_0 is empty. @@ -87,19 +87,19 @@ impl StageTracking { struct WorkflowTxnGenerator { stage: StageTracking, generators: Vec>, - stop_condition_per_stage: Vec, + stage_switch_conditions: Vec, } impl WorkflowTxnGenerator { fn new( stage: StageTracking, generators: Vec>, - stop_condition_per_stage: Vec, + stage_switch_conditions: Vec, ) -> Self { Self { stage, generators, - stop_condition_per_stage, + stage_switch_conditions, } } } @@ -128,12 +128,12 @@ impl TransactionGenerator for WorkflowTxnGenerator { stage_start_time, delay_between_stages, } => { - if stage < self.stop_condition_per_stage.len() + if stage < self.stage_switch_conditions.len() && self - .stop_condition_per_stage + .stage_switch_conditions .get(stage) .unwrap() - .should_stop() + .should_switch() { info!("TransactionGenerator Workflow: Stage {} has consumed all accounts, moving to stage {}", stage, stage + 1); stage_start_time.store( @@ -150,13 +150,13 @@ impl TransactionGenerator for WorkflowTxnGenerator { } }, StageTracking::ExternallySet(_) => { - if stage >= self.stop_condition_per_stage.len() - || (stage < self.stop_condition_per_stage.len() + if stage >= self.stage_switch_conditions.len() + || (stage < self.stage_switch_conditions.len() && self - .stop_condition_per_stage + .stage_switch_conditions .get(stage) .unwrap() - .should_stop()) + .should_switch()) { info!("TransactionGenerator Workflow: Stage {} has consumed all accounts, moving to stage {}", stage, stage + 1); return Vec::new(); @@ -166,7 +166,7 @@ impl TransactionGenerator for WorkflowTxnGenerator { sample!( SampleRate::Duration(Duration::from_millis(250)), - info!("Cur stage: {}, stop conditions per stage: {:?}", stage, self.stop_condition_per_stage); + info!("Cur stage: {}, stage switch conditions: {:?}", stage, self.stage_switch_conditions); ); let result = if let Some(generator) = self.generators.get_mut(stage) { @@ -174,31 +174,31 @@ impl TransactionGenerator for WorkflowTxnGenerator { } else { Vec::new() }; - if let Some(stop_condition) = self.stop_condition_per_stage.get_mut(stage) { - stop_condition.reduce_txn_count(result.len()); + if let Some(switch_condition) = self.stage_switch_conditions.get_mut(stage) { + switch_condition.reduce_txn_count(result.len()); } result } } #[derive(Clone)] -enum StageStopCondition { +enum StageSwitchCondition { WhenPoolBecomesEmpty(Arc>), MaxTransactions(Arc), } -impl StageStopCondition { - fn should_stop(&self) -> bool { +impl StageSwitchCondition { + fn should_switch(&self) -> bool { match self { - StageStopCondition::WhenPoolBecomesEmpty(pool) => pool.len() == 0, - StageStopCondition::MaxTransactions(max) => max.load(Ordering::Relaxed) == 0, + StageSwitchCondition::WhenPoolBecomesEmpty(pool) => pool.len() == 0, + StageSwitchCondition::MaxTransactions(max) => max.load(Ordering::Relaxed) == 0, } } fn reduce_txn_count(&mut self, count: usize) { match self { - StageStopCondition::WhenPoolBecomesEmpty(_) => {}, - StageStopCondition::MaxTransactions(max) => { + StageSwitchCondition::WhenPoolBecomesEmpty(_) => {}, + StageSwitchCondition::MaxTransactions(max) => { let current = max.load(Ordering::Relaxed); if count > current { max.store(0, Ordering::Relaxed); @@ -209,13 +209,13 @@ impl StageStopCondition { } } } -impl Debug for StageStopCondition { +impl Debug for StageSwitchCondition { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - StageStopCondition::WhenPoolBecomesEmpty(pool) => { + StageSwitchCondition::WhenPoolBecomesEmpty(pool) => { write!(f, "WhenPoolBecomesEmpty({})", pool.len()) }, - StageStopCondition::MaxTransactions(max) => { + StageSwitchCondition::MaxTransactions(max) => { write!(f, "MaxTransactions({})", max.load(Ordering::Relaxed)) }, } @@ -225,19 +225,19 @@ impl Debug for StageStopCondition { pub struct WorkflowTxnGeneratorCreator { stage: StageTracking, creators: Vec>, - stop_condition_per_stage: Vec, + stage_switch_conditions: Vec, } impl WorkflowTxnGeneratorCreator { fn new( stage: StageTracking, creators: Vec>, - stop_condition_per_stage: Vec, + stage_switch_conditions: Vec, ) -> Self { Self { stage, creators, - stop_condition_per_stage, + stage_switch_conditions, } } @@ -344,9 +344,9 @@ impl WorkflowTxnGeneratorCreator { )), ]; Self::new(stage_tracking, creators, vec![ - StageStopCondition::MaxTransactions(Arc::new(AtomicUsize::new(count))), - StageStopCondition::WhenPoolBecomesEmpty(created_pool), - StageStopCondition::WhenPoolBecomesEmpty(minted_pool), + StageSwitchCondition::MaxTransactions(Arc::new(AtomicUsize::new(count))), + StageSwitchCondition::WhenPoolBecomesEmpty(created_pool), + StageSwitchCondition::WhenPoolBecomesEmpty(minted_pool), ]) }, } @@ -361,7 +361,7 @@ impl TransactionGeneratorCreator for WorkflowTxnGeneratorCreator { .iter() .map(|c| c.create_transaction_generator()) .collect(), - self.stop_condition_per_stage.clone(), + self.stage_switch_conditions.clone(), )) } } From a2459a55f30e2760205171644a71424f8454b350 Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Mon, 4 Nov 2024 11:37:10 -0800 Subject: [PATCH 3/3] Minor change --- crates/transaction-generator-lib/src/workflow_delegator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/transaction-generator-lib/src/workflow_delegator.rs b/crates/transaction-generator-lib/src/workflow_delegator.rs index 12f067c3c8552..2252fbb4aab60 100644 --- a/crates/transaction-generator-lib/src/workflow_delegator.rs +++ b/crates/transaction-generator-lib/src/workflow_delegator.rs @@ -165,7 +165,7 @@ impl TransactionGenerator for WorkflowTxnGenerator { } sample!( - SampleRate::Duration(Duration::from_millis(250)), + SampleRate::Duration(Duration::from_secs(2)), info!("Cur stage: {}, stage switch conditions: {:?}", stage, self.stage_switch_conditions); );