Skip to content

Commit

Permalink
Add so that Forge tests print (less frequently - every 1m) emit stats.
Browse files Browse the repository at this point in the history
We have a few odd cases, and need to figure out if emitter is causing issues, or nodes.
  • Loading branch information
igor-aptos committed Jun 16, 2023
1 parent 181a3fd commit ff8636e
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 64 deletions.
98 changes: 58 additions & 40 deletions crates/transaction-emitter-lib/src/emitter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,41 @@ impl EmitJob {
self.stats.accumulate(&self.phase_starts)
}

pub fn accumulate(&self) -> Vec<TxnStats> {
pub fn peek_and_accumulate(&self) -> Vec<TxnStats> {
self.stats.accumulate(&self.phase_starts)
}

pub async fn stop_job(self) -> Vec<TxnStats> {
self.stop_and_accumulate().await
}

pub async fn periodic_stat(&self, duration: Duration, interval_secs: u64) {
let deadline = Instant::now() + duration;
let mut prev_stats: Option<Vec<TxnStats>> = None;
let default_stats = TxnStats::default();
let window = Duration::from_secs(max(interval_secs, 1));
loop {
let left = deadline.saturating_duration_since(Instant::now());
if left.is_zero() {
break;
}
tokio::time::sleep(window.min(left)).await;
let cur_phase = self.stats.get_cur_phase();
let stats = self.peek_and_accumulate();
let delta = &stats[cur_phase]
- prev_stats
.as_ref()
.map(|p| &p[cur_phase])
.unwrap_or(&default_stats);
prev_stats = Some(stats);
info!("phase {}: {}", cur_phase, delta.rate());
}
}

pub async fn periodic_stat_forward(self, duration: Duration, interval_secs: u64) -> Self {
self.periodic_stat(duration, interval_secs).await;
self
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -615,17 +647,23 @@ impl TxnEmitter {
);

let all_start_sleep_durations = mode_params.get_all_start_sleep_durations(self.from_rng());
let mut all_accounts_iter = all_accounts.into_iter();
let mut workers = vec![];

// Creating workers is slow with many workers (TODO check why)
// so we create them all first, before starting them - so they start at the right time for
// traffic pattern to be correct.
info!("Tx emitter creating workers");
let mut submission_workers =
Vec::with_capacity(workers_per_endpoint * req.rest_clients.len());
for _ in 0..workers_per_endpoint {
for client in &req.rest_clients {
let accounts = (&mut all_accounts_iter)
.take(mode_params.accounts_per_worker)
.collect::<Vec<_>>();
let accounts =
all_accounts.split_off(all_accounts.len() - mode_params.accounts_per_worker);
assert!(accounts.len() == mode_params.accounts_per_worker);

let stop = stop.clone();
let stats = Arc::clone(&stats);
let txn_generator = txn_generator_creator.create_transaction_generator();
let worker_index = workers.len();
let worker_index = submission_workers.len();

let worker = SubmissionWorker::new(
accounts,
Expand All @@ -638,47 +676,28 @@ impl TxnEmitter {
check_account_sequence_only_once_for.contains(&worker_index),
self.from_rng(),
);
let join_handle = tokio_handle.spawn(worker.run().boxed());
workers.push(Worker { join_handle });
submission_workers.push(worker);
}
}

info!("Tx emitter workers created");
let phase_start = Instant::now();
let workers = submission_workers
.into_iter()
.map(|worker| Worker {
join_handle: tokio_handle.spawn(worker.run(phase_start).boxed()),
})
.collect();
info!("Tx emitter workers started");

Ok(EmitJob {
workers,
stop,
stats,
phase_starts: vec![Instant::now()],
phase_starts: vec![phase_start],
})
}

pub async fn stop_job(self, job: EmitJob) -> Vec<TxnStats> {
job.stop_and_accumulate().await
}

pub fn peek_job_stats(&self, job: &EmitJob) -> Vec<TxnStats> {
job.accumulate()
}

pub async fn periodic_stat(&mut self, job: &EmitJob, duration: Duration, interval_secs: u64) {
let deadline = Instant::now() + duration;
let mut prev_stats: Option<Vec<TxnStats>> = None;
let default_stats = TxnStats::default();
let window = Duration::from_secs(max(interval_secs, 1));
while Instant::now() < deadline {
tokio::time::sleep(window).await;
let cur_phase = job.stats.get_cur_phase();
let stats = self.peek_job_stats(job);
let delta = &stats[cur_phase]
- prev_stats
.as_ref()
.map(|p| &p[cur_phase])
.unwrap_or(&default_stats);
prev_stats = Some(stats);
info!("phase {}: {}", cur_phase, delta.rate());
}
}

async fn emit_txn_for_impl(
mut self,
source_account: &mut LocalAccount,
Expand All @@ -704,14 +723,13 @@ impl TxnEmitter {
job.start_next_phase();
}
if let Some(interval_secs) = print_stats_interval {
self.periodic_stat(&job, per_phase_duration, interval_secs)
.await;
job.periodic_stat(per_phase_duration, interval_secs).await;
} else {
time::sleep(per_phase_duration).await;
}
}
info!("Ran for {} secs, stopping job...", duration.as_secs());
let stats = self.stop_job(job).await;
let stats = job.stop_job().await;
info!("Stopped job");
Ok(stats.into_iter().next().unwrap())
}
Expand Down
55 changes: 41 additions & 14 deletions crates/transaction-emitter-lib/src/emitter/submission_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
},
EmitModeParams,
};
use aptos_logger::{sample, sample::SampleRate, warn};
use aptos_logger::{info, sample, sample::SampleRate, warn};
use aptos_rest_client::Client as RestClient;
use aptos_sdk::{
move_types::account_address::AccountAddress,
Expand Down Expand Up @@ -69,21 +69,22 @@ impl SubmissionWorker {
}

#[allow(clippy::collapsible_if)]
pub(crate) async fn run(mut self) -> Vec<LocalAccount> {
let start_time = Instant::now() + self.start_sleep_duration;

self.sleep_check_done(self.start_sleep_duration).await;
pub(crate) async fn run(mut self, start_instant: Instant) -> Vec<LocalAccount> {
let mut wait_until = start_instant + self.start_sleep_duration;

let now = Instant::now();
if wait_until > now {
self.sleep_check_done(wait_until - now).await;
}
let wait_duration = Duration::from_millis(self.params.wait_millis);
let mut wait_until = start_time;

while !self.stop.load(Ordering::Relaxed) {
let stats_clone = self.stats.clone();
let loop_stats = stats_clone.get_cur();

let loop_start_time = Arc::new(Instant::now());
let loop_start_time = Instant::now();
if wait_duration.as_secs() > 0
&& loop_start_time.duration_since(wait_until) > wait_duration
&& loop_start_time.duration_since(wait_until) > Duration::from_secs(5)
{
sample!(
SampleRate::Duration(Duration::from_secs(120)),
Expand Down Expand Up @@ -114,6 +115,17 @@ impl SubmissionWorker {
})
.or_insert((cur, cur + 1));
}
// Some transaction generators use burner accounts, and will have different
// number of accounts per transaction, so useful to very rarely log.
sample!(
SampleRate::Duration(Duration::from_secs(300)),
info!(
"[{:?}] txn_emitter worker: handling {} accounts, generated txns for: {}",
self.client.path_prefix_string(),
self.accounts.len(),
account_to_start_and_end_seq_num.len(),
)
);

let txn_expiration_time = requests
.iter()
Expand All @@ -130,14 +142,26 @@ impl SubmissionWorker {
submit_transactions(
&self.client,
reqs,
loop_start_time.clone(),
loop_start_time,
txn_offset_time.clone(),
loop_stats,
)
}),
)
.await;

let submitted_after = loop_start_time.elapsed();
if submitted_after.as_secs() > 5 {
sample!(
SampleRate::Duration(Duration::from_secs(120)),
warn!(
"[{:?}] txn_emitter worker waited for more than 5s to submit transactions: {}s after loop start",
self.client.path_prefix_string(),
submitted_after.as_secs(),
)
);
}

if self.skip_latency_stats {
// we also don't want to be stuck waiting for txn_expiration_time_secs
// after stop is called, so we sleep until time or stop is set.
Expand All @@ -148,7 +172,7 @@ impl SubmissionWorker {
}

self.wait_and_update_stats(
*loop_start_time,
loop_start_time,
txn_offset_time.load(Ordering::Relaxed) / (requests.len() as u64),
account_to_start_and_end_seq_num,
// skip latency if asked to check seq_num only once
Expand All @@ -159,8 +183,11 @@ impl SubmissionWorker {
// generally, we should never need to recheck, as we wait enough time
// before calling here, but in case of shutdown/or client we are talking
// to being stale (having stale transaction_version), we might need to wait.
if self.skip_latency_stats { 10 } else { 1 }
* self.params.check_account_sequence_sleep,
if self.skip_latency_stats {
(10 * self.params.check_account_sequence_sleep).max(Duration::from_secs(3))
} else {
self.params.check_account_sequence_sleep
},
loop_stats,
)
.await;
Expand Down Expand Up @@ -288,12 +315,12 @@ impl SubmissionWorker {
pub async fn submit_transactions(
client: &RestClient,
txns: &[SignedTransaction],
loop_start_time: Arc<Instant>,
loop_start_time: Instant,
txn_offset_time: Arc<AtomicU64>,
stats: &StatsAccumulator,
) {
let cur_time = Instant::now();
let offset = cur_time - *loop_start_time;
let offset = cur_time - loop_start_time;
txn_offset_time.fetch_add(
txns.len() as u64 * offset.as_millis() as u64,
Ordering::Relaxed,
Expand Down
16 changes: 10 additions & 6 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,13 +779,17 @@ fn realistic_env_load_sweep_test() -> ForgeConfig {
.with_initial_fullnode_count(10)
.add_network_test(wrap_with_realistic_env(LoadVsPerfBenchmark {
test: Box::new(PerformanceBenchmark),
workloads: Workloads::TPS(&[10, 100, 1000, 3000, 5000]),
workloads: Workloads::TPS(&[3000, 4000, 4500, 5000, 5500]), //, 10, 100, 1000, 3000, 5000]),
criteria: [
(9, 1.5, 3.),
(95, 1.5, 3.),
(950, 2., 3.),
(2750, 2.5, 4.),
// (9, 1.5, 3.),
// (95, 1.5, 3.),
// (950, 2., 3.),
// (2750, 2.5, 4.),
(2600, 3., 5.),
(3600, 3., 5.),
(4100, 3., 5.),
(4600, 3., 5.),
(5000, 3., 5.),
]
.into_iter()
.map(|(min_tps, max_lat_p50, max_lat_p99)| {
Expand Down Expand Up @@ -1533,7 +1537,7 @@ fn realistic_network_tuned_for_throughput_test() -> ForgeConfig {
["dynamic_max_txn_per_s"] = 6000.into();

// Experimental storage optimizations
helm_values["validator"]["config"]["storage"]["rocksdb_configs"]["use_state_kv_db"] =
helm_values["validator"]["config"]["storage"]["rocksdb_configs"]["split_ledger_db"] =
true.into();
helm_values["validator"]["config"]["storage"]["rocksdb_configs"]
["use_sharded_state_merkle_db"] = true.into();
Expand Down
10 changes: 6 additions & 4 deletions testsuite/testcases/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ impl dyn NetworkLoadTest {
stats_tracking_phases = 3;
}

info!("Starting emitting txns for {}s", duration.as_secs());
let mut job = rt
.block_on(emitter.start_job(
ctx.swarm().chain_info().root_account,
Expand All @@ -248,9 +249,8 @@ impl dyn NetworkLoadTest {
let cooldown_duration = duration.mul_f32(cooldown_duration_fraction);
let test_duration = duration - warmup_duration - cooldown_duration;
let phase_duration = test_duration.div_f32((stats_tracking_phases - 2) as f32);
info!("Starting emitting txns for {}s", duration.as_secs());

std::thread::sleep(warmup_duration);
job = rt.block_on(job.periodic_stat_forward(warmup_duration, 60));
info!("{}s warmup finished", warmup_duration.as_secs());

let max_start_ledger_transactions = rt
Expand All @@ -277,8 +277,10 @@ impl dyn NetworkLoadTest {
}
let phase_start = Instant::now();

let join_stats = rt.spawn(job.periodic_stat_forward(phase_duration, 60));
self.test(ctx.swarm, ctx.report, phase_duration)
.context("test NetworkLoadTest")?;
job = rt.block_on(join_stats).context("join stats")?;
actual_phase_durations.push(phase_start.elapsed());
}
let actual_test_duration = test_start.elapsed();
Expand All @@ -302,15 +304,15 @@ impl dyn NetworkLoadTest {

let cooldown_used = cooldown_start.elapsed();
if cooldown_used < cooldown_duration {
std::thread::sleep(cooldown_duration - cooldown_used);
job = rt.block_on(job.periodic_stat_forward(cooldown_duration - cooldown_used, 60));
}
info!("{}s cooldown finished", cooldown_duration.as_secs());

info!(
"Emitting txns ran for {} secs, stopping job...",
duration.as_secs()
);
let stats_by_phase = rt.block_on(emitter.stop_job(job));
let stats_by_phase = rt.block_on(job.stop_job());

info!("Stopped job");
info!("Warmup stats: {}", stats_by_phase[0].rate());
Expand Down

0 comments on commit ff8636e

Please sign in to comment.