From 8d7874336a95c17bfcdb387e5c02452f446ff7b7 Mon Sep 17 00:00:00 2001 From: igor-aptos <110557261+igor-aptos@users.noreply.github.com> Date: Wed, 10 Jul 2024 12:29:30 -0700 Subject: [PATCH] Re-apply qs backpressure increase with buffer latency increase (#13961) * [quorum store] reduce backpressure significantly for more TPS (#13558) As Quorum Store batches are bucketed, and we are looking to increase block limits, now is the time to reduce Quorum Store backpressure. We now allow 36K transactions outstanding. At 12K TPS, this is approximately 3 seconds worth of batches. For forge tests, a lot of the queuing shifts from mempool to POS-to-Proposal, so the limits need to be adjusted accordingly. * increase buffer for expiration in batch creation * adding buffers on inner traffic as well --------- Co-authored-by: Brian (Sunghoon) Cho --- config/src/config/mempool_config.rs | 4 +- config/src/config/quorum_store_config.rs | 6 +- testsuite/forge-cli/src/main.rs | 16 +- testsuite/testcases/src/lib.rs | 284 +++++++++--------- .../testcases/src/load_vs_perf_benchmark.rs | 2 - testsuite/testcases/src/two_traffics_test.rs | 62 ++-- 6 files changed, 188 insertions(+), 186 deletions(-) diff --git a/config/src/config/mempool_config.rs b/config/src/config/mempool_config.rs index 459998b3f5b86..8cabf8ba828b8 100644 --- a/config/src/config/mempool_config.rs +++ b/config/src/config/mempool_config.rs @@ -85,8 +85,8 @@ impl Default for MempoolConfig { system_transaction_timeout_secs: 600, system_transaction_gc_interval_ms: 60_000, broadcast_buckets: DEFAULT_BUCKETS.to_vec(), - eager_expire_threshold_ms: Some(10_000), - eager_expire_time_ms: 3_000, + eager_expire_threshold_ms: Some(15_000), + eager_expire_time_ms: 6_000, } } } diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index 20b7f890a630e..6e442023b1ffa 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -29,14 +29,14 @@ impl Default for QuorumStoreBackPressureConfig { QuorumStoreBackPressureConfig { // QS will be backpressured if the remaining total txns is more than this number // Roughly, target TPS * commit latency seconds - backlog_txn_limit_count: 12_000, + backlog_txn_limit_count: 36_000, // QS will create batches at the max rate until this number is reached - backlog_per_validator_batch_limit_count: 4, + backlog_per_validator_batch_limit_count: 20, decrease_duration_ms: 1000, increase_duration_ms: 1000, decrease_fraction: 0.5, dynamic_min_txn_per_s: 160, - dynamic_max_txn_per_s: 4000, + dynamic_max_txn_per_s: 12000, } } } diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index fab9117f45083..4dbb773c354dc 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -1118,10 +1118,10 @@ fn realistic_env_workload_sweep_test() -> ForgeConfig { ]), // Investigate/improve to make latency more predictable on different workloads criteria: [ - (7700, 100, 0.3, 0.3, 0.5, 0.5), - (7000, 100, 0.3, 0.3, 0.5, 0.5), - (2000, 300, 0.3, 0.8, 0.6, 1.0), - (3200, 500, 0.3, 0.4, 0.7, 0.7), + (7700, 100, 0.3, 0.5, 0.5, 0.5), + (7000, 100, 0.3, 0.5, 0.5, 0.5), + (2000, 300, 0.3, 1.0, 0.6, 1.0), + (3200, 500, 0.3, 1.5, 0.7, 0.7), // (150, 0.5, 1.0, 1.5, 0.65), ] .into_iter() @@ -1966,9 +1966,9 @@ fn realistic_env_max_load_test( .add_system_metrics_threshold(SystemMetricsThreshold::new( // Check that we don't use more than 18 CPU cores for 10% of the time. MetricsThreshold::new(18.0, 10), - // Memory starts around 3GB, and grows around 1.2GB/hr in this test. + // Memory starts around 3.5GB, and grows around 1.4GB/hr in this test. // Check that we don't use more than final expected memory for more than 10% of the time. - MetricsThreshold::new_gb(3.3 + 1.4 * (duration_secs as f64 / 3600.0), 10), + MetricsThreshold::new_gb(3.5 + 1.4 * (duration_secs as f64 / 3600.0), 10), )) .add_no_restarts() .add_wait_for_catchup_s( @@ -1986,8 +1986,8 @@ fn realistic_env_max_load_test( LatencyBreakdownThreshold::new_with_breach_pct( vec![ (LatencyBreakdownSlice::QsBatchToPos, 0.35), - // only reaches close to threshold during epoch change - (LatencyBreakdownSlice::QsPosToProposal, 0.6), + // quorum store backpressure is relaxed, so queueing happens here + (LatencyBreakdownSlice::QsPosToProposal, 2.5), // can be adjusted down if less backpressure (LatencyBreakdownSlice::ConsensusProposalToOrdered, 0.85), // can be adjusted down if less backpressure diff --git a/testsuite/testcases/src/lib.rs b/testsuite/testcases/src/lib.rs index 15ec3b1ffbf2a..c284f1f64eda6 100644 --- a/testsuite/testcases/src/lib.rs +++ b/testsuite/testcases/src/lib.rs @@ -46,8 +46,8 @@ use std::{ }; use tokio::runtime::{Handle, Runtime}; -const WARMUP_DURATION_FRACTION: f32 = 0.07; -const COOLDOWN_DURATION_FRACTION: f32 = 0.04; +pub const WARMUP_DURATION_FRACTION: f32 = 0.07; +pub const COOLDOWN_DURATION_FRACTION: f32 = 0.04; async fn batch_update( ctx: &mut NetworkContext<'_>, @@ -254,7 +254,6 @@ impl NetworkTest for dyn NetworkLoadTest { .await .context("no clients replied for start version")?; let emit_job_request = ctx.emit_job.clone(); - let rng = SeedableRng::from_rng(ctx.core().rng())?; let duration = ctx.global_duration; let stats_by_phase = self .network_load_test( @@ -263,7 +262,6 @@ impl NetworkTest for dyn NetworkLoadTest { duration, WARMUP_DURATION_FRACTION, COOLDOWN_DURATION_FRACTION, - rng, ) .await?; @@ -328,155 +326,173 @@ impl NetworkTest for dyn NetworkLoadTest { } } -impl dyn NetworkLoadTest + '_ { - pub async fn network_load_test<'a>( - &self, - ctx: &mut NetworkContext<'a>, - emit_job_request: EmitJobRequest, - duration: Duration, - warmup_duration_fraction: f32, - cooldown_duration_fraction: f32, - rng: StdRng, - ) -> Result> { - let destination = self.setup(ctx).await.context("setup NetworkLoadTest")?; - let nodes_to_send_load_to = destination.get_destination_nodes(ctx.swarm.clone()).await; +pub async fn create_buffered_load( + swarm: Arc>>, + nodes_to_send_load_to: &[PeerId], + emit_job_request: EmitJobRequest, + duration: Duration, + warmup_duration_fraction: f32, + cooldown_duration_fraction: f32, + mut inner_test_and_report: Option<(&dyn NetworkLoadTest, &mut TestReport)>, +) -> Result> { + // Generate some traffic + let (mut emitter, emit_job_request) = create_emitter_and_request( + swarm.clone(), + emit_job_request, + nodes_to_send_load_to, + StdRng::from_entropy(), + ) + .await + .context("create emitter")?; + + let clients = swarm + .read() + .await + .get_clients_for_peers(nodes_to_send_load_to, Duration::from_secs(10)); - // Generate some traffic + let mut stats_tracking_phases = emit_job_request.get_num_phases(); + assert!(stats_tracking_phases > 0 && stats_tracking_phases != 2); + if stats_tracking_phases == 1 { + stats_tracking_phases = 3; + } - let (mut emitter, emit_job_request) = create_emitter_and_request( - ctx.swarm.clone(), + info!("Starting emitting txns for {}s", duration.as_secs()); + let mut job = emitter + .start_job( + swarm.read().await.chain_info().root_account, emit_job_request, - &nodes_to_send_load_to, - rng, + stats_tracking_phases, ) .await - .context("create emitter")?; + .context("start emitter job")?; - let clients = ctx - .swarm - .read() - .await - .get_clients_for_peers(&nodes_to_send_load_to, Duration::from_secs(10)); - - let mut stats_tracking_phases = emit_job_request.get_num_phases(); - assert!(stats_tracking_phases > 0 && stats_tracking_phases != 2); - if stats_tracking_phases == 1 { - stats_tracking_phases = 3; - } - - info!("Starting emitting txns for {}s", duration.as_secs()); - let mut job = emitter - .start_job( - ctx.swarm.read().await.chain_info().root_account, - emit_job_request, - stats_tracking_phases, - ) - .await - .context("start emitter job")?; - - let total_start = PhaseTimingStart::now(); + let total_start = PhaseTimingStart::now(); - let warmup_duration = duration.mul_f32(warmup_duration_fraction); - let cooldown_duration = duration.mul_f32(cooldown_duration_fraction); - let test_duration = duration - warmup_duration - cooldown_duration; - let phase_duration = test_duration.div_f32((stats_tracking_phases - 2) as f32); + let warmup_duration = duration.mul_f32(warmup_duration_fraction); + let cooldown_duration = duration.mul_f32(cooldown_duration_fraction); + let test_duration = duration - warmup_duration - cooldown_duration; + let phase_duration = test_duration.div_f32((stats_tracking_phases - 2) as f32); - job = job.periodic_stat_forward(warmup_duration, 60).await; - info!("{}s warmup finished", warmup_duration.as_secs()); + job = job.periodic_stat_forward(warmup_duration, 60).await; + info!("{}s warmup finished", warmup_duration.as_secs()); - let mut phase_timing = Vec::new(); - let mut phase_start_network_state = Vec::new(); - let test_start = Instant::now(); - for i in 0..stats_tracking_phases - 2 { - phase_start_network_state.push(NetworkState::new(&clients).await); - job.start_next_phase(); + let mut phase_timing = Vec::new(); + let mut phase_start_network_state = Vec::new(); + let test_start = Instant::now(); + for i in 0..stats_tracking_phases - 2 { + phase_start_network_state.push(NetworkState::new(&clients).await); + job.start_next_phase(); - if i > 0 { - info!( - "Starting test phase {} out of {}", - i, - stats_tracking_phases - 2, - ); - } - let phase_start = PhaseTimingStart::now(); + if i > 0 { + info!( + "Starting test phase {} out of {}", + i, + stats_tracking_phases - 2, + ); + } + let phase_start = PhaseTimingStart::now(); - let join_stats = Handle::current().spawn(job.periodic_stat_forward(phase_duration, 60)); - self.test(ctx.swarm.clone(), ctx.report, phase_duration) + let join_stats = Handle::current().spawn(job.periodic_stat_forward(phase_duration, 60)); + if let Some((inner_test, context)) = inner_test_and_report.as_mut() { + inner_test + .test(swarm.clone(), context, phase_duration) .await .context("test NetworkLoadTest")?; - job = join_stats.await.context("join stats")?; - phase_timing.push(phase_start.elapsed()); } - let actual_test_duration = test_start.elapsed(); - info!( - "{}s test finished after {}s", - test_duration.as_secs(), - actual_test_duration.as_secs() - ); - - phase_start_network_state.push(NetworkState::new(&clients).await); - job.start_next_phase(); - let cooldown_start = Instant::now(); + job = join_stats.await.context("join stats")?; + phase_timing.push(phase_start.elapsed()); + } + let actual_test_duration = test_start.elapsed(); + info!( + "{}s test finished after {}s", + test_duration.as_secs(), + actual_test_duration.as_secs() + ); - let cooldown_used = cooldown_start.elapsed(); - if cooldown_used < cooldown_duration { - job = job - .periodic_stat_forward(cooldown_duration - cooldown_used, 60) - .await; - } - info!("{}s cooldown finished", cooldown_duration.as_secs()); + phase_start_network_state.push(NetworkState::new(&clients).await); + job.start_next_phase(); + let cooldown_start = Instant::now(); - let total_timing = total_start.elapsed(); + let cooldown_used = cooldown_start.elapsed(); + if cooldown_used < cooldown_duration { + job = job + .periodic_stat_forward(cooldown_duration - cooldown_used, 60) + .await; + } + info!("{}s cooldown finished", cooldown_duration.as_secs()); + + let total_timing = total_start.elapsed(); + info!( + "Emitting txns ran for {} secs(from {} to {}), stopping job...", + duration.as_secs(), + total_timing.start_unixtime_s, + total_timing.end_unixtime_s, + ); + let stats_by_phase = job.stop_job().await; + + info!("Stopped job"); + info!("Warmup stats: {}", stats_by_phase[0].rate()); + + let mut stats: Option = None; + let mut stats_by_phase_filtered = Vec::new(); + for i in 0..stats_tracking_phases - 2 { + let next_i = i + 1; + let cur = &stats_by_phase[next_i]; + info!("Test stats [test phase {}]: {}", i, cur.rate()); + stats = if let Some(previous) = stats { + Some(&previous + cur) + } else { + Some(cur.clone()) + }; + let latency_breakdown = fetch_latency_breakdown( + swarm.clone(), + phase_timing[i].start_unixtime_s, + phase_timing[i].end_unixtime_s, + ) + .await?; info!( - "Emitting txns ran for {} secs(from {} to {}), stopping job...", - duration.as_secs(), - total_timing.start_unixtime_s, - total_timing.end_unixtime_s, + "Latency breakdown details for phase {}: from {} to {}: {:?}", + i, phase_timing[i].start_unixtime_s, phase_timing[i].end_unixtime_s, latency_breakdown ); - let stats_by_phase = job.stop_job().await; - - info!("Stopped job"); - info!("Warmup stats: {}", stats_by_phase[0].rate()); - - let mut stats: Option = None; - let mut stats_by_phase_filtered = Vec::new(); - for i in 0..stats_tracking_phases - 2 { - let next_i = i + 1; - let cur = &stats_by_phase[next_i]; - info!("Test stats [test phase {}]: {}", i, cur.rate()); - stats = if let Some(previous) = stats { - Some(&previous + cur) - } else { - Some(cur.clone()) - }; - let latency_breakdown = fetch_latency_breakdown( - ctx.swarm.clone(), - phase_timing[i].start_unixtime_s, - phase_timing[i].end_unixtime_s, - ) - .await?; - info!( - "Latency breakdown details for phase {}: from {} to {}: {:?}", - i, - phase_timing[i].start_unixtime_s, - phase_timing[i].end_unixtime_s, - latency_breakdown - ); - stats_by_phase_filtered.push(LoadTestPhaseStats { - emitter_stats: cur.clone(), - actual_duration: phase_timing[i].duration, - phase_start_unixtime_s: phase_timing[i].start_unixtime_s, - phase_end_unixtime_s: phase_timing[i].end_unixtime_s, - ledger_transactions: NetworkState::ledger_transactions( - &phase_start_network_state[i], - &phase_start_network_state[next_i], - ), - latency_breakdown, - }); - } - info!("Cooldown stats: {}", stats_by_phase.last().unwrap().rate()); + stats_by_phase_filtered.push(LoadTestPhaseStats { + emitter_stats: cur.clone(), + actual_duration: phase_timing[i].duration, + phase_start_unixtime_s: phase_timing[i].start_unixtime_s, + phase_end_unixtime_s: phase_timing[i].end_unixtime_s, + ledger_transactions: NetworkState::ledger_transactions( + &phase_start_network_state[i], + &phase_start_network_state[next_i], + ), + latency_breakdown, + }); + } + info!("Cooldown stats: {}", stats_by_phase.last().unwrap().rate()); - Ok(stats_by_phase_filtered) + Ok(stats_by_phase_filtered) +} + +impl dyn NetworkLoadTest + '_ { + pub async fn network_load_test<'a>( + &self, + ctx: &mut NetworkContext<'a>, + emit_job_request: EmitJobRequest, + duration: Duration, + warmup_duration_fraction: f32, + cooldown_duration_fraction: f32, + ) -> Result> { + let destination = self.setup(ctx).await.context("setup NetworkLoadTest")?; + let nodes_to_send_load_to = destination.get_destination_nodes(ctx.swarm.clone()).await; + + create_buffered_load( + ctx.swarm.clone(), + &nodes_to_send_load_to, + emit_job_request, + duration, + warmup_duration_fraction, + cooldown_duration_fraction, + Some((self, ctx.report)), + ) + .await } } diff --git a/testsuite/testcases/src/load_vs_perf_benchmark.rs b/testsuite/testcases/src/load_vs_perf_benchmark.rs index 64123c184453f..dafc07bdb242f 100644 --- a/testsuite/testcases/src/load_vs_perf_benchmark.rs +++ b/testsuite/testcases/src/load_vs_perf_benchmark.rs @@ -188,7 +188,6 @@ impl LoadVsPerfBenchmark { index: usize, duration: Duration, ) -> Result> { - let rng = SeedableRng::from_rng(ctx.core().rng())?; let emit_job_request = workloads.configure(index, ctx.emit_job.clone()); let stats_by_phase = self .test @@ -198,7 +197,6 @@ impl LoadVsPerfBenchmark { duration, PER_TEST_WARMUP_DURATION_FRACTION, PER_TEST_COOLDOWN_DURATION_FRACTION, - rng, ) .await?; diff --git a/testsuite/testcases/src/two_traffics_test.rs b/testsuite/testcases/src/two_traffics_test.rs index 931881f4e6956..f7955c8712bbb 100644 --- a/testsuite/testcases/src/two_traffics_test.rs +++ b/testsuite/testcases/src/two_traffics_test.rs @@ -1,18 +1,17 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{create_emitter_and_request, LoadDestination, NetworkLoadTest}; +use crate::{ + create_buffered_load, LoadDestination, NetworkLoadTest, COOLDOWN_DURATION_FRACTION, + WARMUP_DURATION_FRACTION, +}; use aptos_forge::{ success_criteria::{SuccessCriteria, SuccessCriteriaChecker}, EmitJobRequest, NetworkContextSynchronizer, NetworkTest, Result, Swarm, Test, TestReport, }; use aptos_logger::info; use async_trait::async_trait; -use rand::{rngs::OsRng, Rng, SeedableRng}; -use std::{ - sync::Arc, - time::{Duration, Instant}, -}; +use std::{sync::Arc, time::Duration}; pub struct TwoTrafficsTest { pub inner_traffic: EmitJobRequest, @@ -40,44 +39,33 @@ impl NetworkLoadTest for TwoTrafficsTest { let nodes_to_send_load_to = LoadDestination::FullnodesOtherwiseValidators .get_destination_nodes(swarm.clone()) .await; - let rng = ::rand::rngs::StdRng::from_seed(OsRng.gen()); - let (emitter, emit_job_request) = create_emitter_and_request( - swarm.clone(), - self.inner_traffic.clone(), + let stats_by_phase = create_buffered_load( + swarm, &nodes_to_send_load_to, - rng, + self.inner_traffic.clone(), + duration, + WARMUP_DURATION_FRACTION, + COOLDOWN_DURATION_FRACTION, + None, ) .await?; - let test_start = Instant::now(); + for phase_stats in stats_by_phase.into_iter() { + report.report_txn_stats( + format!("{}: inner traffic", self.name()), + &phase_stats.emitter_stats, + ); - let stats = emitter - .emit_txn_for( - swarm.read().await.chain_info().root_account, - emit_job_request, - duration, - ) - .await?; + SuccessCriteriaChecker::check_core_for_success( + &self.inner_success_criteria, + report, + &phase_stats.emitter_stats.rate(), + None, + Some("inner traffic".to_string()), + )?; + } - let actual_test_duration = test_start.elapsed(); - info!( - "End to end duration: {}s, while txn emitter lasted: {}s", - actual_test_duration.as_secs(), - stats.lasted.as_secs() - ); - - let rate = stats.rate(); - - report.report_txn_stats(format!("{}: inner traffic", self.name()), &stats); - - SuccessCriteriaChecker::check_core_for_success( - &self.inner_success_criteria, - report, - &rate, - None, - Some("inner traffic".to_string()), - )?; Ok(()) } }