diff --git a/config/src/config/mempool_config.rs b/config/src/config/mempool_config.rs index 459998b3f5b86..8cabf8ba828b8 100644 --- a/config/src/config/mempool_config.rs +++ b/config/src/config/mempool_config.rs @@ -85,8 +85,8 @@ impl Default for MempoolConfig { system_transaction_timeout_secs: 600, system_transaction_gc_interval_ms: 60_000, broadcast_buckets: DEFAULT_BUCKETS.to_vec(), - eager_expire_threshold_ms: Some(10_000), - eager_expire_time_ms: 3_000, + eager_expire_threshold_ms: Some(15_000), + eager_expire_time_ms: 6_000, } } } diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index 20b7f890a630e..6e442023b1ffa 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -29,14 +29,14 @@ impl Default for QuorumStoreBackPressureConfig { QuorumStoreBackPressureConfig { // QS will be backpressured if the remaining total txns is more than this number // Roughly, target TPS * commit latency seconds - backlog_txn_limit_count: 12_000, + backlog_txn_limit_count: 36_000, // QS will create batches at the max rate until this number is reached - backlog_per_validator_batch_limit_count: 4, + backlog_per_validator_batch_limit_count: 20, decrease_duration_ms: 1000, increase_duration_ms: 1000, decrease_fraction: 0.5, dynamic_min_txn_per_s: 160, - dynamic_max_txn_per_s: 4000, + dynamic_max_txn_per_s: 12000, } } } diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index fab9117f45083..4dbb773c354dc 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -1118,10 +1118,10 @@ fn realistic_env_workload_sweep_test() -> ForgeConfig { ]), // Investigate/improve to make latency more predictable on different workloads criteria: [ - (7700, 100, 0.3, 0.3, 0.5, 0.5), - (7000, 100, 0.3, 0.3, 0.5, 0.5), - (2000, 300, 0.3, 0.8, 0.6, 1.0), - (3200, 500, 0.3, 0.4, 0.7, 0.7), + (7700, 100, 0.3, 0.5, 0.5, 0.5), + (7000, 100, 0.3, 0.5, 0.5, 0.5), + (2000, 300, 0.3, 1.0, 0.6, 1.0), + (3200, 500, 0.3, 1.5, 0.7, 0.7), // (150, 0.5, 1.0, 1.5, 0.65), ] .into_iter() @@ -1966,9 +1966,9 @@ fn realistic_env_max_load_test( .add_system_metrics_threshold(SystemMetricsThreshold::new( // Check that we don't use more than 18 CPU cores for 10% of the time. MetricsThreshold::new(18.0, 10), - // Memory starts around 3GB, and grows around 1.2GB/hr in this test. + // Memory starts around 3.5GB, and grows around 1.4GB/hr in this test. // Check that we don't use more than final expected memory for more than 10% of the time. - MetricsThreshold::new_gb(3.3 + 1.4 * (duration_secs as f64 / 3600.0), 10), + MetricsThreshold::new_gb(3.5 + 1.4 * (duration_secs as f64 / 3600.0), 10), )) .add_no_restarts() .add_wait_for_catchup_s( @@ -1986,8 +1986,8 @@ fn realistic_env_max_load_test( LatencyBreakdownThreshold::new_with_breach_pct( vec![ (LatencyBreakdownSlice::QsBatchToPos, 0.35), - // only reaches close to threshold during epoch change - (LatencyBreakdownSlice::QsPosToProposal, 0.6), + // quorum store backpressure is relaxed, so queueing happens here + (LatencyBreakdownSlice::QsPosToProposal, 2.5), // can be adjusted down if less backpressure (LatencyBreakdownSlice::ConsensusProposalToOrdered, 0.85), // can be adjusted down if less backpressure diff --git a/testsuite/testcases/src/lib.rs b/testsuite/testcases/src/lib.rs index 15ec3b1ffbf2a..c284f1f64eda6 100644 --- a/testsuite/testcases/src/lib.rs +++ b/testsuite/testcases/src/lib.rs @@ -46,8 +46,8 @@ use std::{ }; use tokio::runtime::{Handle, Runtime}; -const WARMUP_DURATION_FRACTION: f32 = 0.07; -const COOLDOWN_DURATION_FRACTION: f32 = 0.04; +pub const WARMUP_DURATION_FRACTION: f32 = 0.07; +pub const COOLDOWN_DURATION_FRACTION: f32 = 0.04; async fn batch_update( ctx: &mut NetworkContext<'_>, @@ -254,7 +254,6 @@ impl NetworkTest for dyn NetworkLoadTest { .await .context("no clients replied for start version")?; let emit_job_request = ctx.emit_job.clone(); - let rng = SeedableRng::from_rng(ctx.core().rng())?; let duration = ctx.global_duration; let stats_by_phase = self .network_load_test( @@ -263,7 +262,6 @@ impl NetworkTest for dyn NetworkLoadTest { duration, WARMUP_DURATION_FRACTION, COOLDOWN_DURATION_FRACTION, - rng, ) .await?; @@ -328,155 +326,173 @@ impl NetworkTest for dyn NetworkLoadTest { } } -impl dyn NetworkLoadTest + '_ { - pub async fn network_load_test<'a>( - &self, - ctx: &mut NetworkContext<'a>, - emit_job_request: EmitJobRequest, - duration: Duration, - warmup_duration_fraction: f32, - cooldown_duration_fraction: f32, - rng: StdRng, - ) -> Result> { - let destination = self.setup(ctx).await.context("setup NetworkLoadTest")?; - let nodes_to_send_load_to = destination.get_destination_nodes(ctx.swarm.clone()).await; +pub async fn create_buffered_load( + swarm: Arc>>, + nodes_to_send_load_to: &[PeerId], + emit_job_request: EmitJobRequest, + duration: Duration, + warmup_duration_fraction: f32, + cooldown_duration_fraction: f32, + mut inner_test_and_report: Option<(&dyn NetworkLoadTest, &mut TestReport)>, +) -> Result> { + // Generate some traffic + let (mut emitter, emit_job_request) = create_emitter_and_request( + swarm.clone(), + emit_job_request, + nodes_to_send_load_to, + StdRng::from_entropy(), + ) + .await + .context("create emitter")?; + + let clients = swarm + .read() + .await + .get_clients_for_peers(nodes_to_send_load_to, Duration::from_secs(10)); - // Generate some traffic + let mut stats_tracking_phases = emit_job_request.get_num_phases(); + assert!(stats_tracking_phases > 0 && stats_tracking_phases != 2); + if stats_tracking_phases == 1 { + stats_tracking_phases = 3; + } - let (mut emitter, emit_job_request) = create_emitter_and_request( - ctx.swarm.clone(), + info!("Starting emitting txns for {}s", duration.as_secs()); + let mut job = emitter + .start_job( + swarm.read().await.chain_info().root_account, emit_job_request, - &nodes_to_send_load_to, - rng, + stats_tracking_phases, ) .await - .context("create emitter")?; + .context("start emitter job")?; - let clients = ctx - .swarm - .read() - .await - .get_clients_for_peers(&nodes_to_send_load_to, Duration::from_secs(10)); - - let mut stats_tracking_phases = emit_job_request.get_num_phases(); - assert!(stats_tracking_phases > 0 && stats_tracking_phases != 2); - if stats_tracking_phases == 1 { - stats_tracking_phases = 3; - } - - info!("Starting emitting txns for {}s", duration.as_secs()); - let mut job = emitter - .start_job( - ctx.swarm.read().await.chain_info().root_account, - emit_job_request, - stats_tracking_phases, - ) - .await - .context("start emitter job")?; - - let total_start = PhaseTimingStart::now(); + let total_start = PhaseTimingStart::now(); - let warmup_duration = duration.mul_f32(warmup_duration_fraction); - let cooldown_duration = duration.mul_f32(cooldown_duration_fraction); - let test_duration = duration - warmup_duration - cooldown_duration; - let phase_duration = test_duration.div_f32((stats_tracking_phases - 2) as f32); + let warmup_duration = duration.mul_f32(warmup_duration_fraction); + let cooldown_duration = duration.mul_f32(cooldown_duration_fraction); + let test_duration = duration - warmup_duration - cooldown_duration; + let phase_duration = test_duration.div_f32((stats_tracking_phases - 2) as f32); - job = job.periodic_stat_forward(warmup_duration, 60).await; - info!("{}s warmup finished", warmup_duration.as_secs()); + job = job.periodic_stat_forward(warmup_duration, 60).await; + info!("{}s warmup finished", warmup_duration.as_secs()); - let mut phase_timing = Vec::new(); - let mut phase_start_network_state = Vec::new(); - let test_start = Instant::now(); - for i in 0..stats_tracking_phases - 2 { - phase_start_network_state.push(NetworkState::new(&clients).await); - job.start_next_phase(); + let mut phase_timing = Vec::new(); + let mut phase_start_network_state = Vec::new(); + let test_start = Instant::now(); + for i in 0..stats_tracking_phases - 2 { + phase_start_network_state.push(NetworkState::new(&clients).await); + job.start_next_phase(); - if i > 0 { - info!( - "Starting test phase {} out of {}", - i, - stats_tracking_phases - 2, - ); - } - let phase_start = PhaseTimingStart::now(); + if i > 0 { + info!( + "Starting test phase {} out of {}", + i, + stats_tracking_phases - 2, + ); + } + let phase_start = PhaseTimingStart::now(); - let join_stats = Handle::current().spawn(job.periodic_stat_forward(phase_duration, 60)); - self.test(ctx.swarm.clone(), ctx.report, phase_duration) + let join_stats = Handle::current().spawn(job.periodic_stat_forward(phase_duration, 60)); + if let Some((inner_test, context)) = inner_test_and_report.as_mut() { + inner_test + .test(swarm.clone(), context, phase_duration) .await .context("test NetworkLoadTest")?; - job = join_stats.await.context("join stats")?; - phase_timing.push(phase_start.elapsed()); } - let actual_test_duration = test_start.elapsed(); - info!( - "{}s test finished after {}s", - test_duration.as_secs(), - actual_test_duration.as_secs() - ); - - phase_start_network_state.push(NetworkState::new(&clients).await); - job.start_next_phase(); - let cooldown_start = Instant::now(); + job = join_stats.await.context("join stats")?; + phase_timing.push(phase_start.elapsed()); + } + let actual_test_duration = test_start.elapsed(); + info!( + "{}s test finished after {}s", + test_duration.as_secs(), + actual_test_duration.as_secs() + ); - let cooldown_used = cooldown_start.elapsed(); - if cooldown_used < cooldown_duration { - job = job - .periodic_stat_forward(cooldown_duration - cooldown_used, 60) - .await; - } - info!("{}s cooldown finished", cooldown_duration.as_secs()); + phase_start_network_state.push(NetworkState::new(&clients).await); + job.start_next_phase(); + let cooldown_start = Instant::now(); - let total_timing = total_start.elapsed(); + let cooldown_used = cooldown_start.elapsed(); + if cooldown_used < cooldown_duration { + job = job + .periodic_stat_forward(cooldown_duration - cooldown_used, 60) + .await; + } + info!("{}s cooldown finished", cooldown_duration.as_secs()); + + let total_timing = total_start.elapsed(); + info!( + "Emitting txns ran for {} secs(from {} to {}), stopping job...", + duration.as_secs(), + total_timing.start_unixtime_s, + total_timing.end_unixtime_s, + ); + let stats_by_phase = job.stop_job().await; + + info!("Stopped job"); + info!("Warmup stats: {}", stats_by_phase[0].rate()); + + let mut stats: Option = None; + let mut stats_by_phase_filtered = Vec::new(); + for i in 0..stats_tracking_phases - 2 { + let next_i = i + 1; + let cur = &stats_by_phase[next_i]; + info!("Test stats [test phase {}]: {}", i, cur.rate()); + stats = if let Some(previous) = stats { + Some(&previous + cur) + } else { + Some(cur.clone()) + }; + let latency_breakdown = fetch_latency_breakdown( + swarm.clone(), + phase_timing[i].start_unixtime_s, + phase_timing[i].end_unixtime_s, + ) + .await?; info!( - "Emitting txns ran for {} secs(from {} to {}), stopping job...", - duration.as_secs(), - total_timing.start_unixtime_s, - total_timing.end_unixtime_s, + "Latency breakdown details for phase {}: from {} to {}: {:?}", + i, phase_timing[i].start_unixtime_s, phase_timing[i].end_unixtime_s, latency_breakdown ); - let stats_by_phase = job.stop_job().await; - - info!("Stopped job"); - info!("Warmup stats: {}", stats_by_phase[0].rate()); - - let mut stats: Option = None; - let mut stats_by_phase_filtered = Vec::new(); - for i in 0..stats_tracking_phases - 2 { - let next_i = i + 1; - let cur = &stats_by_phase[next_i]; - info!("Test stats [test phase {}]: {}", i, cur.rate()); - stats = if let Some(previous) = stats { - Some(&previous + cur) - } else { - Some(cur.clone()) - }; - let latency_breakdown = fetch_latency_breakdown( - ctx.swarm.clone(), - phase_timing[i].start_unixtime_s, - phase_timing[i].end_unixtime_s, - ) - .await?; - info!( - "Latency breakdown details for phase {}: from {} to {}: {:?}", - i, - phase_timing[i].start_unixtime_s, - phase_timing[i].end_unixtime_s, - latency_breakdown - ); - stats_by_phase_filtered.push(LoadTestPhaseStats { - emitter_stats: cur.clone(), - actual_duration: phase_timing[i].duration, - phase_start_unixtime_s: phase_timing[i].start_unixtime_s, - phase_end_unixtime_s: phase_timing[i].end_unixtime_s, - ledger_transactions: NetworkState::ledger_transactions( - &phase_start_network_state[i], - &phase_start_network_state[next_i], - ), - latency_breakdown, - }); - } - info!("Cooldown stats: {}", stats_by_phase.last().unwrap().rate()); + stats_by_phase_filtered.push(LoadTestPhaseStats { + emitter_stats: cur.clone(), + actual_duration: phase_timing[i].duration, + phase_start_unixtime_s: phase_timing[i].start_unixtime_s, + phase_end_unixtime_s: phase_timing[i].end_unixtime_s, + ledger_transactions: NetworkState::ledger_transactions( + &phase_start_network_state[i], + &phase_start_network_state[next_i], + ), + latency_breakdown, + }); + } + info!("Cooldown stats: {}", stats_by_phase.last().unwrap().rate()); - Ok(stats_by_phase_filtered) + Ok(stats_by_phase_filtered) +} + +impl dyn NetworkLoadTest + '_ { + pub async fn network_load_test<'a>( + &self, + ctx: &mut NetworkContext<'a>, + emit_job_request: EmitJobRequest, + duration: Duration, + warmup_duration_fraction: f32, + cooldown_duration_fraction: f32, + ) -> Result> { + let destination = self.setup(ctx).await.context("setup NetworkLoadTest")?; + let nodes_to_send_load_to = destination.get_destination_nodes(ctx.swarm.clone()).await; + + create_buffered_load( + ctx.swarm.clone(), + &nodes_to_send_load_to, + emit_job_request, + duration, + warmup_duration_fraction, + cooldown_duration_fraction, + Some((self, ctx.report)), + ) + .await } } diff --git a/testsuite/testcases/src/load_vs_perf_benchmark.rs b/testsuite/testcases/src/load_vs_perf_benchmark.rs index 64123c184453f..dafc07bdb242f 100644 --- a/testsuite/testcases/src/load_vs_perf_benchmark.rs +++ b/testsuite/testcases/src/load_vs_perf_benchmark.rs @@ -188,7 +188,6 @@ impl LoadVsPerfBenchmark { index: usize, duration: Duration, ) -> Result> { - let rng = SeedableRng::from_rng(ctx.core().rng())?; let emit_job_request = workloads.configure(index, ctx.emit_job.clone()); let stats_by_phase = self .test @@ -198,7 +197,6 @@ impl LoadVsPerfBenchmark { duration, PER_TEST_WARMUP_DURATION_FRACTION, PER_TEST_COOLDOWN_DURATION_FRACTION, - rng, ) .await?; diff --git a/testsuite/testcases/src/two_traffics_test.rs b/testsuite/testcases/src/two_traffics_test.rs index 931881f4e6956..f7955c8712bbb 100644 --- a/testsuite/testcases/src/two_traffics_test.rs +++ b/testsuite/testcases/src/two_traffics_test.rs @@ -1,18 +1,17 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{create_emitter_and_request, LoadDestination, NetworkLoadTest}; +use crate::{ + create_buffered_load, LoadDestination, NetworkLoadTest, COOLDOWN_DURATION_FRACTION, + WARMUP_DURATION_FRACTION, +}; use aptos_forge::{ success_criteria::{SuccessCriteria, SuccessCriteriaChecker}, EmitJobRequest, NetworkContextSynchronizer, NetworkTest, Result, Swarm, Test, TestReport, }; use aptos_logger::info; use async_trait::async_trait; -use rand::{rngs::OsRng, Rng, SeedableRng}; -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; +use std::{sync::Arc, time::Duration}; pub struct TwoTrafficsTest { pub inner_traffic: EmitJobRequest, @@ -40,44 +39,33 @@ impl NetworkLoadTest for TwoTrafficsTest { let nodes_to_send_load_to = LoadDestination::FullnodesOtherwiseValidators .get_destination_nodes(swarm.clone()) .await; - let rng = ::rand::rngs::StdRng::from_seed(OsRng.gen()); - let (emitter, emit_job_request) = create_emitter_and_request( - swarm.clone(), - self.inner_traffic.clone(), + let stats_by_phase = create_buffered_load( + swarm, &nodes_to_send_load_to, - rng, + self.inner_traffic.clone(), + duration, + WARMUP_DURATION_FRACTION, + COOLDOWN_DURATION_FRACTION, + None, ) .await?; - let test_start = Instant::now(); + for phase_stats in stats_by_phase.into_iter() { + report.report_txn_stats( + format!("{}: inner traffic", self.name()), + &phase_stats.emitter_stats, + ); - let stats = emitter - .emit_txn_for( - swarm.read().await.chain_info().root_account, - emit_job_request, - duration, - ) - .await?; + SuccessCriteriaChecker::check_core_for_success( + &self.inner_success_criteria, + report, + &phase_stats.emitter_stats.rate(), + None, + Some("inner traffic".to_string()), + )?; + } - let actual_test_duration = test_start.elapsed(); - info!( - "End to end duration: {}s, while txn emitter lasted: {}s", - actual_test_duration.as_secs(), - stats.lasted.as_secs() - ); - - let rate = stats.rate(); - - report.report_txn_stats(format!("{}: inner traffic", self.name()), &stats); - - SuccessCriteriaChecker::check_core_for_success( - &self.inner_success_criteria, - report, - &rate, - None, - Some("inner traffic".to_string()), - )?; Ok(()) } }