From ff8636e8672bac4d87acd94f356eb643fb0c3b73 Mon Sep 17 00:00:00 2001 From: Igor Date: Thu, 15 Jun 2023 16:13:23 -0700 Subject: [PATCH] Add so that Forge tests print (less frequently - every 1m) emit stats. We have a few odd cases, and need to figure out if emitter is causing issues, or nodes. --- .../src/emitter/mod.rs | 98 +++++++++++-------- .../src/emitter/submission_worker.rs | 55 ++++++++--- testsuite/forge-cli/src/main.rs | 16 +-- testsuite/testcases/src/lib.rs | 10 +- 4 files changed, 115 insertions(+), 64 deletions(-) diff --git a/crates/transaction-emitter-lib/src/emitter/mod.rs b/crates/transaction-emitter-lib/src/emitter/mod.rs index 79555f480e2e55..7ce901dafec272 100644 --- a/crates/transaction-emitter-lib/src/emitter/mod.rs +++ b/crates/transaction-emitter-lib/src/emitter/mod.rs @@ -493,9 +493,41 @@ impl EmitJob { self.stats.accumulate(&self.phase_starts) } - pub fn accumulate(&self) -> Vec { + pub fn peek_and_accumulate(&self) -> Vec { self.stats.accumulate(&self.phase_starts) } + + pub async fn stop_job(self) -> Vec { + self.stop_and_accumulate().await + } + + pub async fn periodic_stat(&self, duration: Duration, interval_secs: u64) { + let deadline = Instant::now() + duration; + let mut prev_stats: Option> = None; + let default_stats = TxnStats::default(); + let window = Duration::from_secs(max(interval_secs, 1)); + loop { + let left = deadline.saturating_duration_since(Instant::now()); + if left.is_zero() { + break; + } + tokio::time::sleep(window.min(left)).await; + let cur_phase = self.stats.get_cur_phase(); + let stats = self.peek_and_accumulate(); + let delta = &stats[cur_phase] + - prev_stats + .as_ref() + .map(|p| &p[cur_phase]) + .unwrap_or(&default_stats); + prev_stats = Some(stats); + info!("phase {}: {}", cur_phase, delta.rate()); + } + } + + pub async fn periodic_stat_forward(self, duration: Duration, interval_secs: u64) -> Self { + self.periodic_stat(duration, interval_secs).await; + self + } } #[derive(Debug)] @@ -615,17 +647,23 @@ impl TxnEmitter { ); let all_start_sleep_durations = mode_params.get_all_start_sleep_durations(self.from_rng()); - let mut all_accounts_iter = all_accounts.into_iter(); - let mut workers = vec![]; + + // Creating workers is slow with many workers (TODO check why) + // so we create them all first, before starting them - so they start at the right time for + // traffic pattern to be correct. + info!("Tx emitter creating workers"); + let mut submission_workers = + Vec::with_capacity(workers_per_endpoint * req.rest_clients.len()); for _ in 0..workers_per_endpoint { for client in &req.rest_clients { - let accounts = (&mut all_accounts_iter) - .take(mode_params.accounts_per_worker) - .collect::>(); + let accounts = + all_accounts.split_off(all_accounts.len() - mode_params.accounts_per_worker); + assert!(accounts.len() == mode_params.accounts_per_worker); + let stop = stop.clone(); let stats = Arc::clone(&stats); let txn_generator = txn_generator_creator.create_transaction_generator(); - let worker_index = workers.len(); + let worker_index = submission_workers.len(); let worker = SubmissionWorker::new( accounts, @@ -638,47 +676,28 @@ impl TxnEmitter { check_account_sequence_only_once_for.contains(&worker_index), self.from_rng(), ); - let join_handle = tokio_handle.spawn(worker.run().boxed()); - workers.push(Worker { join_handle }); + submission_workers.push(worker); } } + + info!("Tx emitter workers created"); + let phase_start = Instant::now(); + let workers = submission_workers + .into_iter() + .map(|worker| Worker { + join_handle: tokio_handle.spawn(worker.run(phase_start).boxed()), + }) + .collect(); info!("Tx emitter workers started"); Ok(EmitJob { workers, stop, stats, - phase_starts: vec![Instant::now()], + phase_starts: vec![phase_start], }) } - pub async fn stop_job(self, job: EmitJob) -> Vec { - job.stop_and_accumulate().await - } - - pub fn peek_job_stats(&self, job: &EmitJob) -> Vec { - job.accumulate() - } - - pub async fn periodic_stat(&mut self, job: &EmitJob, duration: Duration, interval_secs: u64) { - let deadline = Instant::now() + duration; - let mut prev_stats: Option> = None; - let default_stats = TxnStats::default(); - let window = Duration::from_secs(max(interval_secs, 1)); - while Instant::now() < deadline { - tokio::time::sleep(window).await; - let cur_phase = job.stats.get_cur_phase(); - let stats = self.peek_job_stats(job); - let delta = &stats[cur_phase] - - prev_stats - .as_ref() - .map(|p| &p[cur_phase]) - .unwrap_or(&default_stats); - prev_stats = Some(stats); - info!("phase {}: {}", cur_phase, delta.rate()); - } - } - async fn emit_txn_for_impl( mut self, source_account: &mut LocalAccount, @@ -704,14 +723,13 @@ impl TxnEmitter { job.start_next_phase(); } if let Some(interval_secs) = print_stats_interval { - self.periodic_stat(&job, per_phase_duration, interval_secs) - .await; + job.periodic_stat(per_phase_duration, interval_secs).await; } else { time::sleep(per_phase_duration).await; } } info!("Ran for {} secs, stopping job...", duration.as_secs()); - let stats = self.stop_job(job).await; + let stats = job.stop_job().await; info!("Stopped job"); Ok(stats.into_iter().next().unwrap()) } diff --git a/crates/transaction-emitter-lib/src/emitter/submission_worker.rs b/crates/transaction-emitter-lib/src/emitter/submission_worker.rs index 54051976a00a42..499c17e0f091fb 100644 --- a/crates/transaction-emitter-lib/src/emitter/submission_worker.rs +++ b/crates/transaction-emitter-lib/src/emitter/submission_worker.rs @@ -8,7 +8,7 @@ use crate::{ }, EmitModeParams, }; -use aptos_logger::{sample, sample::SampleRate, warn}; +use aptos_logger::{info, sample, sample::SampleRate, warn}; use aptos_rest_client::Client as RestClient; use aptos_sdk::{ move_types::account_address::AccountAddress, @@ -69,21 +69,22 @@ impl SubmissionWorker { } #[allow(clippy::collapsible_if)] - pub(crate) async fn run(mut self) -> Vec { - let start_time = Instant::now() + self.start_sleep_duration; - - self.sleep_check_done(self.start_sleep_duration).await; + pub(crate) async fn run(mut self, start_instant: Instant) -> Vec { + let mut wait_until = start_instant + self.start_sleep_duration; + let now = Instant::now(); + if wait_until > now { + self.sleep_check_done(wait_until - now).await; + } let wait_duration = Duration::from_millis(self.params.wait_millis); - let mut wait_until = start_time; while !self.stop.load(Ordering::Relaxed) { let stats_clone = self.stats.clone(); let loop_stats = stats_clone.get_cur(); - let loop_start_time = Arc::new(Instant::now()); + let loop_start_time = Instant::now(); if wait_duration.as_secs() > 0 - && loop_start_time.duration_since(wait_until) > wait_duration + && loop_start_time.duration_since(wait_until) > Duration::from_secs(5) { sample!( SampleRate::Duration(Duration::from_secs(120)), @@ -114,6 +115,17 @@ impl SubmissionWorker { }) .or_insert((cur, cur + 1)); } + // Some transaction generators use burner accounts, and will have different + // number of accounts per transaction, so useful to very rarely log. + sample!( + SampleRate::Duration(Duration::from_secs(300)), + info!( + "[{:?}] txn_emitter worker: handling {} accounts, generated txns for: {}", + self.client.path_prefix_string(), + self.accounts.len(), + account_to_start_and_end_seq_num.len(), + ) + ); let txn_expiration_time = requests .iter() @@ -130,7 +142,7 @@ impl SubmissionWorker { submit_transactions( &self.client, reqs, - loop_start_time.clone(), + loop_start_time, txn_offset_time.clone(), loop_stats, ) @@ -138,6 +150,18 @@ impl SubmissionWorker { ) .await; + let submitted_after = loop_start_time.elapsed(); + if submitted_after.as_secs() > 5 { + sample!( + SampleRate::Duration(Duration::from_secs(120)), + warn!( + "[{:?}] txn_emitter worker waited for more than 5s to submit transactions: {}s after loop start", + self.client.path_prefix_string(), + submitted_after.as_secs(), + ) + ); + } + if self.skip_latency_stats { // we also don't want to be stuck waiting for txn_expiration_time_secs // after stop is called, so we sleep until time or stop is set. @@ -148,7 +172,7 @@ impl SubmissionWorker { } self.wait_and_update_stats( - *loop_start_time, + loop_start_time, txn_offset_time.load(Ordering::Relaxed) / (requests.len() as u64), account_to_start_and_end_seq_num, // skip latency if asked to check seq_num only once @@ -159,8 +183,11 @@ impl SubmissionWorker { // generally, we should never need to recheck, as we wait enough time // before calling here, but in case of shutdown/or client we are talking // to being stale (having stale transaction_version), we might need to wait. - if self.skip_latency_stats { 10 } else { 1 } - * self.params.check_account_sequence_sleep, + if self.skip_latency_stats { + (10 * self.params.check_account_sequence_sleep).max(Duration::from_secs(3)) + } else { + self.params.check_account_sequence_sleep + }, loop_stats, ) .await; @@ -288,12 +315,12 @@ impl SubmissionWorker { pub async fn submit_transactions( client: &RestClient, txns: &[SignedTransaction], - loop_start_time: Arc, + loop_start_time: Instant, txn_offset_time: Arc, stats: &StatsAccumulator, ) { let cur_time = Instant::now(); - let offset = cur_time - *loop_start_time; + let offset = cur_time - loop_start_time; txn_offset_time.fetch_add( txns.len() as u64 * offset.as_millis() as u64, Ordering::Relaxed, diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index 2e5f7659c11876..167988980dc098 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -779,13 +779,17 @@ fn realistic_env_load_sweep_test() -> ForgeConfig { .with_initial_fullnode_count(10) .add_network_test(wrap_with_realistic_env(LoadVsPerfBenchmark { test: Box::new(PerformanceBenchmark), - workloads: Workloads::TPS(&[10, 100, 1000, 3000, 5000]), + workloads: Workloads::TPS(&[3000, 4000, 4500, 5000, 5500]), //, 10, 100, 1000, 3000, 5000]), criteria: [ - (9, 1.5, 3.), - (95, 1.5, 3.), - (950, 2., 3.), - (2750, 2.5, 4.), + // (9, 1.5, 3.), + // (95, 1.5, 3.), + // (950, 2., 3.), + // (2750, 2.5, 4.), + (2600, 3., 5.), + (3600, 3., 5.), + (4100, 3., 5.), (4600, 3., 5.), + (5000, 3., 5.), ] .into_iter() .map(|(min_tps, max_lat_p50, max_lat_p99)| { @@ -1533,7 +1537,7 @@ fn realistic_network_tuned_for_throughput_test() -> ForgeConfig { ["dynamic_max_txn_per_s"] = 6000.into(); // Experimental storage optimizations - helm_values["validator"]["config"]["storage"]["rocksdb_configs"]["use_state_kv_db"] = + helm_values["validator"]["config"]["storage"]["rocksdb_configs"]["split_ledger_db"] = true.into(); helm_values["validator"]["config"]["storage"]["rocksdb_configs"] ["use_sharded_state_merkle_db"] = true.into(); diff --git a/testsuite/testcases/src/lib.rs b/testsuite/testcases/src/lib.rs index 9d98d766d30544..26d3424399776d 100644 --- a/testsuite/testcases/src/lib.rs +++ b/testsuite/testcases/src/lib.rs @@ -236,6 +236,7 @@ impl dyn NetworkLoadTest { stats_tracking_phases = 3; } + info!("Starting emitting txns for {}s", duration.as_secs()); let mut job = rt .block_on(emitter.start_job( ctx.swarm().chain_info().root_account, @@ -248,9 +249,8 @@ impl dyn NetworkLoadTest { let cooldown_duration = duration.mul_f32(cooldown_duration_fraction); let test_duration = duration - warmup_duration - cooldown_duration; let phase_duration = test_duration.div_f32((stats_tracking_phases - 2) as f32); - info!("Starting emitting txns for {}s", duration.as_secs()); - std::thread::sleep(warmup_duration); + job = rt.block_on(job.periodic_stat_forward(warmup_duration, 60)); info!("{}s warmup finished", warmup_duration.as_secs()); let max_start_ledger_transactions = rt @@ -277,8 +277,10 @@ impl dyn NetworkLoadTest { } let phase_start = Instant::now(); + let join_stats = rt.spawn(job.periodic_stat_forward(phase_duration, 60)); self.test(ctx.swarm, ctx.report, phase_duration) .context("test NetworkLoadTest")?; + job = rt.block_on(join_stats).context("join stats")?; actual_phase_durations.push(phase_start.elapsed()); } let actual_test_duration = test_start.elapsed(); @@ -302,7 +304,7 @@ impl dyn NetworkLoadTest { let cooldown_used = cooldown_start.elapsed(); if cooldown_used < cooldown_duration { - std::thread::sleep(cooldown_duration - cooldown_used); + job = rt.block_on(job.periodic_stat_forward(cooldown_duration - cooldown_used, 60)); } info!("{}s cooldown finished", cooldown_duration.as_secs()); @@ -310,7 +312,7 @@ impl dyn NetworkLoadTest { "Emitting txns ran for {} secs, stopping job...", duration.as_secs() ); - let stats_by_phase = rt.block_on(emitter.stop_job(job)); + let stats_by_phase = rt.block_on(job.stop_job()); info!("Stopped job"); info!("Warmup stats: {}", stats_by_phase[0].rate());