Skip to content

Commit

Permalink
Renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
vusirikala committed Nov 4, 2024
1 parent 1a7ab6f commit e200586
Showing 1 changed file with 31 additions and 31 deletions.
62 changes: 31 additions & 31 deletions crates/transaction-generator-lib/src/workflow_delegator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl StageTracking {
///
/// pool_i is filled by gen_i, and consumed by gen_i+1, and so there is one less pools than generators.
///
/// We start with stage 0, which calls gen_0 stop_condition_per_stage[0].len() times, which populates pool_0 with accounts.
/// We start with stage 0, which calls gen_0 stage_switch_conditions[0].len() times, which populates pool_0 with accounts.
///
/// After that, in stage 1, we call gen_1, which consumes accounts from pool_0, and moves them to pool_1.
/// We do this until pool_0 is empty.
Expand All @@ -87,19 +87,19 @@ impl StageTracking {
struct WorkflowTxnGenerator {
stage: StageTracking,
generators: Vec<Box<dyn TransactionGenerator>>,
stop_condition_per_stage: Vec<StageStopCondition>,
stage_switch_conditions: Vec<StageSwitchCondition>,
}

impl WorkflowTxnGenerator {
fn new(
stage: StageTracking,
generators: Vec<Box<dyn TransactionGenerator>>,
stop_condition_per_stage: Vec<StageStopCondition>,
stage_switch_conditions: Vec<StageSwitchCondition>,
) -> Self {
Self {
stage,
generators,
stop_condition_per_stage,
stage_switch_conditions,
}
}
}
Expand Down Expand Up @@ -128,12 +128,12 @@ impl TransactionGenerator for WorkflowTxnGenerator {
stage_start_time,
delay_between_stages,
} => {
if stage < self.stop_condition_per_stage.len()
if stage < self.stage_switch_conditions.len()
&& self
.stop_condition_per_stage
.stage_switch_conditions
.get(stage)
.unwrap()
.should_stop()
.should_switch()
{
info!("TransactionGenerator Workflow: Stage {} has consumed all accounts, moving to stage {}", stage, stage + 1);
stage_start_time.store(
Expand All @@ -150,13 +150,13 @@ impl TransactionGenerator for WorkflowTxnGenerator {
}
},
StageTracking::ExternallySet(_) => {
if stage >= self.stop_condition_per_stage.len()
|| (stage < self.stop_condition_per_stage.len()
if stage >= self.stage_switch_conditions.len()
|| (stage < self.stage_switch_conditions.len()
&& self
.stop_condition_per_stage
.stage_switch_conditions
.get(stage)
.unwrap()
.should_stop())
.should_switch())
{
info!("TransactionGenerator Workflow: Stage {} has consumed all accounts, moving to stage {}", stage, stage + 1);
return Vec::new();
Expand All @@ -166,39 +166,39 @@ impl TransactionGenerator for WorkflowTxnGenerator {

sample!(
SampleRate::Duration(Duration::from_millis(250)),
info!("Cur stage: {}, stop conditions per stage: {:?}", stage, self.stop_condition_per_stage);
info!("Cur stage: {}, stage switch conditions: {:?}", stage, self.stage_switch_conditions);
);

let result = if let Some(generator) = self.generators.get_mut(stage) {
generator.generate_transactions(account, num_to_create)
} else {
Vec::new()
};
if let Some(stop_condition) = self.stop_condition_per_stage.get_mut(stage) {
stop_condition.reduce_txn_count(result.len());
if let Some(switch_condition) = self.stage_switch_conditions.get_mut(stage) {
switch_condition.reduce_txn_count(result.len());
}
result
}
}

#[derive(Clone)]
enum StageStopCondition {
enum StageSwitchCondition {
WhenPoolBecomesEmpty(Arc<ObjectPool<LocalAccount>>),
MaxTransactions(Arc<AtomicUsize>),
}

impl StageStopCondition {
fn should_stop(&self) -> bool {
impl StageSwitchCondition {
fn should_switch(&self) -> bool {
match self {
StageStopCondition::WhenPoolBecomesEmpty(pool) => pool.len() == 0,
StageStopCondition::MaxTransactions(max) => max.load(Ordering::Relaxed) == 0,
StageSwitchCondition::WhenPoolBecomesEmpty(pool) => pool.len() == 0,
StageSwitchCondition::MaxTransactions(max) => max.load(Ordering::Relaxed) == 0,
}
}

fn reduce_txn_count(&mut self, count: usize) {
match self {
StageStopCondition::WhenPoolBecomesEmpty(_) => {},
StageStopCondition::MaxTransactions(max) => {
StageSwitchCondition::WhenPoolBecomesEmpty(_) => {},
StageSwitchCondition::MaxTransactions(max) => {
let current = max.load(Ordering::Relaxed);
if count > current {
max.store(0, Ordering::Relaxed);
Expand All @@ -209,13 +209,13 @@ impl StageStopCondition {
}
}
}
impl Debug for StageStopCondition {
impl Debug for StageSwitchCondition {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StageStopCondition::WhenPoolBecomesEmpty(pool) => {
StageSwitchCondition::WhenPoolBecomesEmpty(pool) => {
write!(f, "WhenPoolBecomesEmpty({})", pool.len())
},
StageStopCondition::MaxTransactions(max) => {
StageSwitchCondition::MaxTransactions(max) => {
write!(f, "MaxTransactions({})", max.load(Ordering::Relaxed))
},
}
Expand All @@ -225,19 +225,19 @@ impl Debug for StageStopCondition {
pub struct WorkflowTxnGeneratorCreator {
stage: StageTracking,
creators: Vec<Box<dyn TransactionGeneratorCreator>>,
stop_condition_per_stage: Vec<StageStopCondition>,
stage_switch_conditions: Vec<StageSwitchCondition>,
}

impl WorkflowTxnGeneratorCreator {
fn new(
stage: StageTracking,
creators: Vec<Box<dyn TransactionGeneratorCreator>>,
stop_condition_per_stage: Vec<StageStopCondition>,
stage_switch_conditions: Vec<StageSwitchCondition>,
) -> Self {
Self {
stage,
creators,
stop_condition_per_stage,
stage_switch_conditions,
}
}

Expand Down Expand Up @@ -344,9 +344,9 @@ impl WorkflowTxnGeneratorCreator {
)),
];
Self::new(stage_tracking, creators, vec![
StageStopCondition::MaxTransactions(Arc::new(AtomicUsize::new(count))),
StageStopCondition::WhenPoolBecomesEmpty(created_pool),
StageStopCondition::WhenPoolBecomesEmpty(minted_pool),
StageSwitchCondition::MaxTransactions(Arc::new(AtomicUsize::new(count))),
StageSwitchCondition::WhenPoolBecomesEmpty(created_pool),
StageSwitchCondition::WhenPoolBecomesEmpty(minted_pool),
])
},
}
Expand All @@ -361,7 +361,7 @@ impl TransactionGeneratorCreator for WorkflowTxnGeneratorCreator {
.iter()
.map(|c| c.create_transaction_generator())
.collect(),
self.stop_condition_per_stage.clone(),
self.stage_switch_conditions.clone(),
))
}
}

0 comments on commit e200586

Please sign in to comment.