Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor workflow delegator in transaction generator #15179

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 79 additions & 66 deletions crates/transaction-generator-lib/src/workflow_delegator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use aptos_sdk::{
types::{transaction::SignedTransaction, LocalAccount},
};
use std::{
cmp,
fmt::Debug,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
Expand All @@ -24,9 +24,10 @@ use std::{

#[derive(Clone)]
enum StageTracking {
// stage is externally modified
// Stage is externally modified. This is used by executor benchmark tests
ExternallySet(Arc<AtomicUsize>),
// we move to a next stage when all accounts have finished with the current stage
// We move to a next stage when all accounts have finished with the current stage
// This is used by transaction emitter (forge and tests on mainnet, devnet, testnet)
WhenDone {
stage_counter: Arc<AtomicUsize>,
stage_start_time: Arc<AtomicU64>,
Expand Down Expand Up @@ -70,7 +71,7 @@ impl StageTracking {
///
/// pool_i is filled by gen_i, and consumed by gen_i+1, and so there is one less pools than generators.
///
/// We start with stage 0, which calls gen_0 pool_per_stage times, which populates pool_0 with accounts.
/// We start with stage 0, which calls gen_0 stage_switch_conditions[0].len() times, which populates pool_0 with accounts.
///
/// After that, in stage 1, we call gen_1, which consumes accounts from pool_0, and moves them to pool_1.
/// We do this until pool_0 is empty.
Expand All @@ -86,26 +87,19 @@ impl StageTracking {
struct WorkflowTxnGenerator {
stage: StageTracking,
generators: Vec<Box<dyn TransactionGenerator>>,
pool_per_stage: Vec<Arc<ObjectPool<LocalAccount>>>,
num_for_first_stage: usize,
// Internal counter, so multiple workers (WorkflowTxnGenerator) can coordinate how many times to execute the first stage
completed_for_first_stage: Arc<AtomicUsize>,
stage_switch_conditions: Vec<StageSwitchCondition>,
}

impl WorkflowTxnGenerator {
fn new(
stage: StageTracking,
generators: Vec<Box<dyn TransactionGenerator>>,
pool_per_stage: Vec<Arc<ObjectPool<LocalAccount>>>,
num_for_first_stage: usize,
completed_for_first_stage: Arc<AtomicUsize>,
stage_switch_conditions: Vec<StageSwitchCondition>,
) -> Self {
Self {
stage,
generators,
pool_per_stage,
num_for_first_stage,
completed_for_first_stage,
stage_switch_conditions,
}
}
}
Expand All @@ -114,7 +108,7 @@ impl TransactionGenerator for WorkflowTxnGenerator {
fn generate_transactions(
&mut self,
account: &LocalAccount,
mut num_to_create: usize,
num_to_create: usize,
) -> Vec<SignedTransaction> {
assert_ne!(num_to_create, 0);
let stage = match self.stage.load_current_stage() {
Expand All @@ -128,42 +122,18 @@ impl TransactionGenerator for WorkflowTxnGenerator {
},
};

if stage == 0 {
// We can treat completed_for_first_stage as a stream of indices [0, +inf),
// where we want to execute only first num_for_first_stage (i.e. [0, num_for_first_stage) )
// So here we grab num_to_create "indices" from completed_for_first_stage counter,
// and then skip those that are in [num_for_first_stage, +inf) range.
let prev = self
.completed_for_first_stage
.fetch_add(num_to_create, Ordering::Relaxed);
num_to_create = cmp::min(num_to_create, self.num_for_first_stage.saturating_sub(prev));
}
// if stage is not 0, then grabing from the pool itself, inside of the generator.generate_transactions
// acts as coordinator, as it will generate as many transactions as number of accounts it could grab from the pool.

match &self.stage {
StageTracking::WhenDone {
stage_counter,
stage_start_time,
delay_between_stages,
} => {
if stage == 0 {
if num_to_create == 0 {
info!("TransactionGenerator Workflow: Stage 0 is full with {} accounts, moving to stage 1", self.pool_per_stage.first().unwrap().len());
stage_start_time.store(
StageTracking::current_timestamp() + delay_between_stages.as_secs(),
Ordering::Relaxed,
);
let _ = stage_counter.compare_exchange(
0,
1,
Ordering::Relaxed,
Ordering::Relaxed,
);
return Vec::new();
}
} else if stage < self.pool_per_stage.len()
&& self.pool_per_stage.get(stage - 1).unwrap().len() == 0
if stage < self.stage_switch_conditions.len()
&& self
.stage_switch_conditions
.get(stage)
.unwrap()
.should_switch()
{
info!("TransactionGenerator Workflow: Stage {} has consumed all accounts, moving to stage {}", stage, stage + 1);
stage_start_time.store(
Expand All @@ -180,48 +150,94 @@ impl TransactionGenerator for WorkflowTxnGenerator {
}
},
StageTracking::ExternallySet(_) => {
if stage == 0 && num_to_create == 0 {
if stage >= self.stage_switch_conditions.len()
|| (stage < self.stage_switch_conditions.len()
&& self
.stage_switch_conditions
.get(stage)
.unwrap()
.should_switch())
{
info!("TransactionGenerator Workflow: Stage {} has consumed all accounts, moving to stage {}", stage, stage + 1);
return Vec::new();
}
},
}

sample!(
SampleRate::Duration(Duration::from_secs(2)),
info!("Cur stage: {}, pool sizes: {:?}", stage, self.pool_per_stage.iter().map(|p| p.len()).collect::<Vec<_>>());
info!("Cur stage: {}, stage switch conditions: {:?}", stage, self.stage_switch_conditions);
);

let result = if let Some(generator) = self.generators.get_mut(stage) {
generator.generate_transactions(account, num_to_create)
} else {
Vec::new()
};

if let Some(switch_condition) = self.stage_switch_conditions.get_mut(stage) {
switch_condition.reduce_txn_count(result.len());
}
result
}
}

#[derive(Clone)]
enum StageSwitchCondition {
WhenPoolBecomesEmpty(Arc<ObjectPool<LocalAccount>>),
MaxTransactions(Arc<AtomicUsize>),
}

impl StageSwitchCondition {
fn should_switch(&self) -> bool {
match self {
StageSwitchCondition::WhenPoolBecomesEmpty(pool) => pool.len() == 0,
StageSwitchCondition::MaxTransactions(max) => max.load(Ordering::Relaxed) == 0,
}
}

fn reduce_txn_count(&mut self, count: usize) {
match self {
StageSwitchCondition::WhenPoolBecomesEmpty(_) => {},
StageSwitchCondition::MaxTransactions(max) => {
let current = max.load(Ordering::Relaxed);
if count > current {
max.store(0, Ordering::Relaxed);
} else {
max.fetch_sub(count, Ordering::Relaxed);
}
},
}
}
}
impl Debug for StageSwitchCondition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StageSwitchCondition::WhenPoolBecomesEmpty(pool) => {
write!(f, "WhenPoolBecomesEmpty({})", pool.len())
},
StageSwitchCondition::MaxTransactions(max) => {
write!(f, "MaxTransactions({})", max.load(Ordering::Relaxed))
},
}
}
}

pub struct WorkflowTxnGeneratorCreator {
stage: StageTracking,
creators: Vec<Box<dyn TransactionGeneratorCreator>>,
pool_per_stage: Vec<Arc<ObjectPool<LocalAccount>>>,
num_for_first_stage: usize,
completed_for_first_stage: Arc<AtomicUsize>,
stage_switch_conditions: Vec<StageSwitchCondition>,
}

impl WorkflowTxnGeneratorCreator {
fn new(
stage: StageTracking,
creators: Vec<Box<dyn TransactionGeneratorCreator>>,
pool_per_stage: Vec<Arc<ObjectPool<LocalAccount>>>,
num_for_first_stage: usize,
stage_switch_conditions: Vec<StageSwitchCondition>,
) -> Self {
Self {
stage,
creators,
pool_per_stage,
num_for_first_stage,
completed_for_first_stage: Arc::new(AtomicUsize::new(0)),
stage_switch_conditions,
}
}

Expand Down Expand Up @@ -273,7 +289,7 @@ impl WorkflowTxnGeneratorCreator {
txn_executor,
num_modules,
mint_entry_point.package_name(),
Some(20_00000000),
Some(40_0000_0000),
)
.await;

Expand Down Expand Up @@ -327,12 +343,11 @@ impl WorkflowTxnGeneratorCreator {
Some(burnt_pool.clone()),
)),
];
Self::new(
stage_tracking,
creators,
vec![created_pool, minted_pool, burnt_pool],
count,
)
Self::new(stage_tracking, creators, vec![
StageSwitchCondition::MaxTransactions(Arc::new(AtomicUsize::new(count))),
StageSwitchCondition::WhenPoolBecomesEmpty(created_pool),
StageSwitchCondition::WhenPoolBecomesEmpty(minted_pool),
])
},
}
}
Expand All @@ -346,9 +361,7 @@ impl TransactionGeneratorCreator for WorkflowTxnGeneratorCreator {
.iter()
.map(|c| c.create_transaction_generator())
.collect(),
self.pool_per_stage.clone(),
self.num_for_first_stage,
self.completed_for_first_stage.clone(),
self.stage_switch_conditions.clone(),
))
}
}
Loading