diff --git a/.github/workflows/forge-stable.yaml b/.github/workflows/forge-stable.yaml index e6faf65e62f67..9328faa1cbf91 100644 --- a/.github/workflows/forge-stable.yaml +++ b/.github/workflows/forge-stable.yaml @@ -127,11 +127,23 @@ jobs: FORGE_TEST_SUITE: realistic_env_load_sweep POST_TO_SLACK: true - run-forge-realistic-env-graceful-overload: + run-forge-realistic-env-workload-sweep: if: ${{ github.event_name != 'pull_request' && always() }} needs: [determine-test-metadata, run-forge-realistic-env-load-sweep] # Only run after the previous job completes uses: aptos-labs/aptos-core/.github/workflows/workflow-run-forge.yaml@main secrets: inherit + with: + IMAGE_TAG: ${{ needs.determine-test-metadata.outputs.IMAGE_TAG }} + FORGE_NAMESPACE: forge-realistic-env-workload-sweep-${{ needs.determine-test-metadata.outputs.IMAGE_TAG }} + FORGE_RUNNER_DURATION_SECS: 1600 # Run for 26 minutes (4 tests, each for 400 seconds) + FORGE_TEST_SUITE: realistic_env_workload_sweep + POST_TO_SLACK: true + + run-forge-realistic-env-graceful-overload: + if: ${{ github.event_name != 'pull_request' && always() }} + needs: [determine-test-metadata, run-forge-realistic-env-workload-sweep] # Only run after the previous job completes + uses: aptos-labs/aptos-core/.github/workflows/workflow-run-forge.yaml@main + secrets: inherit with: IMAGE_TAG: ${{ needs.determine-test-metadata.outputs.IMAGE_TAG }} FORGE_NAMESPACE: forge-realistic-env-graceful-overload-${{ needs.determine-test-metadata.outputs.IMAGE_TAG }} diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index 2daf55c8f26aa..e1d2ee6db1539 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -527,6 +527,7 @@ fn single_test_suite( // Rest of the tests: "realistic_env_max_load_large" => realistic_env_max_load_test(duration, test_cmd, 20, 10), "realistic_env_load_sweep" => realistic_env_load_sweep_test(), + "realistic_env_workload_sweep" => realistic_env_workload_sweep_test(), "realistic_env_graceful_overload" => realistic_env_graceful_overload(), "realistic_network_tuned_for_throughput" => realistic_network_tuned_for_throughput_test(), "epoch_changer_performance" => epoch_changer_performance(), @@ -797,31 +798,19 @@ fn consensus_stress_test() -> ForgeConfig { }) } -fn realistic_env_load_sweep_test() -> ForgeConfig { +fn realistic_env_sweep_wrap( + num_validators: usize, + num_fullnodes: usize, + test: LoadVsPerfBenchmark, +) -> ForgeConfig { ForgeConfig::default() - .with_initial_validator_count(NonZeroUsize::new(20).unwrap()) - .with_initial_fullnode_count(10) - .add_network_test(wrap_with_realistic_env(LoadVsPerfBenchmark { - test: Box::new(PerformanceBenchmark), - workloads: Workloads::TPS(&[10, 100, 1000, 3000, 5000]), - criteria: [ - (9, 1.5, 3., 4.), - (95, 1.5, 3., 4.), - (950, 2., 3., 4.), - (2750, 2.5, 3.5, 4.5), - (4600, 3., 4., 5.), - ] - .into_iter() - .map(|(min_tps, max_lat_p50, max_lat_p90, max_lat_p99)| { - SuccessCriteria::new(min_tps) - .add_max_expired_tps(0) - .add_max_failed_submission_tps(0) - .add_latency_threshold(max_lat_p50, LatencyType::P50) - .add_latency_threshold(max_lat_p90, LatencyType::P90) - .add_latency_threshold(max_lat_p99, LatencyType::P99) - }) - .collect(), + .with_initial_validator_count(NonZeroUsize::new(num_validators).unwrap()) + .with_initial_fullnode_count(num_fullnodes) + .with_node_helm_config_fn(Arc::new(move |helm_values| { + helm_values["validator"]["config"]["execution"] + ["processed_transactions_detailed_counters"] = true.into(); })) + .add_network_test(wrap_with_realistic_env(test)) // Test inherits the main EmitJobRequest, so update here for more precise latency measurements .with_emit_job( EmitJobRequest::default().latency_polling_interval(Duration::from_millis(100)), @@ -841,6 +830,98 @@ fn realistic_env_load_sweep_test() -> ForgeConfig { ) } +fn realistic_env_load_sweep_test() -> ForgeConfig { + realistic_env_sweep_wrap(20, 10, LoadVsPerfBenchmark { + test: Box::new(PerformanceBenchmark), + workloads: Workloads::TPS(&[10, 100, 1000, 3000, 5000]), + criteria: [ + (9, 1.5, 3., 4.), + (95, 1.5, 3., 4.), + (950, 2., 3., 4.), + (2750, 2.5, 3.5, 4.5), + (4600, 3., 4., 5.), + ] + .into_iter() + .map(|(min_tps, max_lat_p50, max_lat_p90, max_lat_p99)| { + SuccessCriteria::new(min_tps) + .add_max_expired_tps(0) + .add_max_failed_submission_tps(0) + .add_latency_threshold(max_lat_p50, LatencyType::P50) + .add_latency_threshold(max_lat_p90, LatencyType::P90) + .add_latency_threshold(max_lat_p99, LatencyType::P99) + }) + .collect(), + }) +} + +fn realistic_env_workload_sweep_test() -> ForgeConfig { + realistic_env_sweep_wrap(7, 3, LoadVsPerfBenchmark { + test: Box::new(PerformanceBenchmark), + workloads: Workloads::TRANSACTIONS(&[ + TransactionWorkload { + transaction_type: TransactionTypeArg::CoinTransfer, + num_modules: 1, + unique_senders: false, + mempool_backlog: 20000, + }, + TransactionWorkload { + transaction_type: TransactionTypeArg::NoOp, + num_modules: 100, + unique_senders: false, + mempool_backlog: 20000, + }, + TransactionWorkload { + transaction_type: TransactionTypeArg::ModifyGlobalResource, + num_modules: 1, + unique_senders: true, + mempool_backlog: 20000, + }, + TransactionWorkload { + transaction_type: TransactionTypeArg::TokenV2AmbassadorMint, + num_modules: 1, + unique_senders: true, + mempool_backlog: 10000, + }, + // transactions get rejected, to fix. + // TransactionWorkload { + // transaction_type: TransactionTypeArg::PublishPackage, + // num_modules: 1, + // unique_senders: true, + // mempool_backlog: 1000, + // }, + ]), + // Investigate/improve to make latency more predictable on different workloads + criteria: [ + (3700, 0.35, 0.5, 0.8, 0.65), + (2800, 0.35, 0.5, 1.2, 1.2), + (1800, 0.35, 0.5, 1.5, 2.7), + (950, 0.35, 0.65, 1.5, 2.7), + // (150, 0.5, 1.0, 1.5, 0.65), + ] + .into_iter() + .map( + |(min_tps, batch_to_pos, pos_to_proposal, proposal_to_ordered, ordered_to_commit)| { + SuccessCriteria::new(min_tps) + .add_max_expired_tps(200) + .add_max_failed_submission_tps(200) + .add_latency_breakdown_threshold(LatencyBreakdownThreshold::new_strict(vec![ + (LatencyBreakdownSlice::QsBatchToPos, batch_to_pos), + (LatencyBreakdownSlice::QsPosToProposal, pos_to_proposal), + ( + LatencyBreakdownSlice::ConsensusProposalToOrdered, + proposal_to_ordered, + ), + ( + LatencyBreakdownSlice::ConsensusOrderedToCommit, + ordered_to_commit, + ), + ])) + }, + ) + .collect(), + }) +} + fn load_vs_perf_benchmark() -> ForgeConfig { ForgeConfig::default() .with_initial_validator_count(NonZeroUsize::new(20).unwrap()) @@ -875,9 +956,6 @@ fn workload_vs_perf_benchmark() -> ForgeConfig { helm_values["validator"]["config"]["execution"] ["processed_transactions_detailed_counters"] = true.into(); })) - // .with_emit_job(EmitJobRequest::default().mode(EmitJobMode::MaxLoad { - // mempool_backlog: 10000, - // })) .add_network_test(LoadVsPerfBenchmark { test: Box::new(PerformanceBenchmark), workloads: Workloads::TRANSACTIONS(&[ @@ -885,41 +963,49 @@ fn workload_vs_perf_benchmark() -> ForgeConfig { transaction_type: TransactionTypeArg::NoOp, num_modules: 1, unique_senders: false, + mempool_backlog: 20000, }, TransactionWorkload { transaction_type: TransactionTypeArg::NoOp, num_modules: 1, unique_senders: true, + mempool_backlog: 20000, }, TransactionWorkload { transaction_type: TransactionTypeArg::NoOp, num_modules: 1000, unique_senders: false, + mempool_backlog: 20000, }, TransactionWorkload { transaction_type: TransactionTypeArg::CoinTransfer, num_modules: 1, unique_senders: true, + mempool_backlog: 20000, }, TransactionWorkload { transaction_type: TransactionTypeArg::CoinTransfer, num_modules: 1, unique_senders: true, + mempool_backlog: 20000, }, TransactionWorkload { transaction_type: TransactionTypeArg::AccountResource32B, num_modules: 1, unique_senders: true, + mempool_backlog: 20000, }, TransactionWorkload { transaction_type: TransactionTypeArg::AccountResource1KB, num_modules: 1, unique_senders: true, + mempool_backlog: 20000, }, TransactionWorkload { transaction_type: TransactionTypeArg::PublishPackage, num_modules: 1, unique_senders: true, + mempool_backlog: 20000, }, ]), criteria: Vec::new(), diff --git a/testsuite/forge/src/success_criteria.rs b/testsuite/forge/src/success_criteria.rs index 5856ee21c905d..88fd7c40fc34e 100644 --- a/testsuite/forge/src/success_criteria.rs +++ b/testsuite/forge/src/success_criteria.rs @@ -114,10 +114,17 @@ impl LatencyBreakdownThreshold { } } - pub fn ensure_threshold(&self, metrics: &LatencyBreakdown) -> anyhow::Result<()> { + pub fn ensure_threshold( + &self, + metrics: &LatencyBreakdown, + traffic_name_addition: &String, + ) -> anyhow::Result<()> { for (slice, threshold) in &self.thresholds { let samples = metrics.get_samples(slice); - threshold.ensure_metrics_threshold(&format!("{:?}", slice), samples.get())?; + threshold.ensure_metrics_threshold( + &format!("{:?}{}", slice, traffic_name_addition), + samples.get(), + )?; } Ok(()) } @@ -220,7 +227,8 @@ impl SuccessCriteriaChecker { &traffic_name_addition, )?; if let Some(latency_breakdown_thresholds) = &success_criteria.latency_breakdown_thresholds { - latency_breakdown_thresholds.ensure_threshold(latency_breakdown.unwrap())?; + latency_breakdown_thresholds + .ensure_threshold(latency_breakdown.unwrap(), &traffic_name_addition)?; } Ok(()) } @@ -244,22 +252,24 @@ impl SuccessCriteriaChecker { ); let stats_rate = stats.rate(); + let no_traffic_name_addition = "".to_string(); Self::check_throughput( success_criteria.min_avg_tps, success_criteria.max_expired_tps, success_criteria.max_failed_submission_tps, &stats_rate, - &"".to_string(), + &no_traffic_name_addition, )?; Self::check_latency( &success_criteria.latency_thresholds, &stats_rate, - &"".to_string(), + &no_traffic_name_addition, )?; if let Some(latency_breakdown_thresholds) = &success_criteria.latency_breakdown_thresholds { - latency_breakdown_thresholds.ensure_threshold(latency_breakdown)?; + latency_breakdown_thresholds + .ensure_threshold(latency_breakdown, &no_traffic_name_addition)?; } if let Some(timeout) = success_criteria.wait_for_all_nodes_to_catchup { diff --git a/testsuite/testcases/src/load_vs_perf_benchmark.rs b/testsuite/testcases/src/load_vs_perf_benchmark.rs index a282eb70a542b..d69aef659b4da 100644 --- a/testsuite/testcases/src/load_vs_perf_benchmark.rs +++ b/testsuite/testcases/src/load_vs_perf_benchmark.rs @@ -10,10 +10,7 @@ use aptos_forge::{ }; use aptos_logger::info; use rand::SeedableRng; -use std::{ - fmt::{self, Debug, Display}, - time::Duration, -}; +use std::{fmt::Debug, time::Duration}; use tokio::runtime::Runtime; pub struct SingleRunStats { @@ -24,6 +21,7 @@ pub struct SingleRunStats { actual_duration: Duration, } +#[derive(Debug)] pub enum Workloads { TPS(&'static [usize]), TRANSACTIONS(&'static [TransactionWorkload]), @@ -37,10 +35,29 @@ impl Workloads { } } - fn name(&self, index: usize) -> String { + fn type_name(&self) -> String { + match self { + Self::TPS(_) => "Load (TPS)".to_string(), + Self::TRANSACTIONS(_) => "Workload".to_string(), + } + } + + fn phase_name(&self, index: usize, phase: usize) -> String { match self { - Self::TPS(tpss) => tpss[index].to_string(), - Self::TRANSACTIONS(workloads) => workloads[index].to_string(), + Self::TPS(tpss) => { + assert_eq!(phase, 0); + format!("{}", tpss[index]) + }, + Self::TRANSACTIONS(workloads) => format!( + "{}{}: {}", + index, + if workloads[index].is_phased() { + format!(": ph{}", phase) + } else { + "".to_string() + }, + workloads[index].phase_name(phase) + ), } } @@ -57,16 +74,23 @@ pub struct TransactionWorkload { pub transaction_type: TransactionTypeArg, pub num_modules: usize, pub unique_senders: bool, + pub mempool_backlog: usize, } impl TransactionWorkload { + fn is_phased(&self) -> bool { + self.unique_senders + } + fn configure(&self, request: EmitJobRequest) -> EmitJobRequest { let account_creation_type = TransactionTypeArg::AccountGenerationLargePool.materialize(1, false); - if self.unique_senders { - request.transaction_type(self.transaction_type.materialize(self.num_modules, false)) - } else { + let request = request.mode(EmitJobMode::MaxLoad { + mempool_backlog: self.mempool_backlog, + }); + + if self.is_phased() { let write_type = self.transaction_type.materialize(self.num_modules, true); request.transaction_mix_per_phase(vec![ // warmup @@ -76,13 +100,27 @@ impl TransactionWorkload { // cooldown vec![(write_type, 1)], ]) + } else { + request.transaction_type(self.transaction_type.materialize(self.num_modules, false)) } } -} -impl Display for TransactionWorkload { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - Debug::fmt(self, f) + fn phase_name(&self, phase: usize) -> String { + format!( + "{}{}[{:.1}k]", + match (self.is_phased(), phase) { + (true, 0) => "CreateBurnerAccounts".to_string(), + (true, 1) => format!("{:?}", self.transaction_type), + (false, 0) => format!("{:?}", self.transaction_type), + _ => unreachable!(), + }, + if self.num_modules > 1 { + format!("({} modules)", self.num_modules) + } else { + "".to_string() + }, + self.mempool_backlog as f32 / 1000.0, + ) } } @@ -120,14 +158,9 @@ impl LoadVsPerfBenchmark { )?; let mut result = vec![]; - let phased = stats_by_phase.len() > 1; for (phase, phase_stats) in stats_by_phase.into_iter().enumerate() { result.push(SingleRunStats { - name: if phased { - format!("{}_phase_{}", workloads.name(index), phase) - } else { - workloads.name(index) - }, + name: workloads.phase_name(index, phase), stats: phase_stats.emitter_stats, latency_breakdown: phase_stats.latency_breakdown, ledger_transactions: phase_stats.ledger_transactions, @@ -163,37 +196,34 @@ impl NetworkTest for LoadVsPerfBenchmark { std::thread::sleep(buffer); } - info!("Starting for {}", self.workloads.name(index)); - results.append(&mut self.evaluate_single( - ctx, - &self.workloads, - index, - individual_duration, - )?); + info!("Starting for {:?}", self.workloads); + results.push(self.evaluate_single(ctx, &self.workloads, index, individual_duration)?); // Note: uncomment below to perform reconfig during a test // let mut aptos_info = ctx.swarm().aptos_public_info(); // runtime.block_on(aptos_info.reconfig()); - let table = to_table(&results); + let table = to_table(self.workloads.type_name(), &results); for line in table { info!("{}", line); } } - let table = to_table(&results); + let table = to_table(self.workloads.type_name(), &results); for line in table { ctx.report.report_text(line); } for (index, result) in results.iter().enumerate() { - let rate = result.stats.rate(); + // always take last phase for success criteria + let target_result = &result[result.len() - 1]; + let rate = target_result.stats.rate(); if let Some(criteria) = self.criteria.get(index) { SuccessCriteriaChecker::check_core_for_success( criteria, ctx.report, &rate, - Some(&result.latency_breakdown), - Some(result.name.clone()), + Some(&target_result.latency_breakdown), + Some(target_result.name.clone()), )?; } } @@ -201,11 +231,11 @@ impl NetworkTest for LoadVsPerfBenchmark { } } -fn to_table(results: &[SingleRunStats]) -> Vec { +fn to_table(type_name: String, results: &[Vec]) -> Vec { let mut table = Vec::new(); table.push(format!( - "{: <30} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12}", - "workload", + "{: <40} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12}", + type_name, "submitted/s", "committed/s", "expired/s", @@ -222,26 +252,28 @@ fn to_table(results: &[SingleRunStats]) -> Vec { "actual dur" )); - for result in results { - let rate = result.stats.rate(); - table.push(format!( - "{: <30} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12}", - result.name, - rate.submitted, - rate.committed, - rate.expired, - rate.failed_submission, - result.ledger_transactions / result.actual_duration.as_secs(), - rate.latency, - rate.p50_latency, - rate.p90_latency, - rate.p99_latency, - result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsBatchToPos).max_sample(), - result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsPosToProposal).max_sample(), - result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusProposalToOrdered).max_sample(), - result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusOrderedToCommit).max_sample(), - result.actual_duration.as_secs() - )); + for run_results in results { + for result in run_results { + let rate = result.stats.rate(); + table.push(format!( + "{: <40} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12}", + result.name, + rate.submitted, + rate.committed, + rate.expired, + rate.failed_submission, + result.ledger_transactions / result.actual_duration.as_secs(), + rate.latency, + rate.p50_latency, + rate.p90_latency, + rate.p99_latency, + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsBatchToPos).max_sample(), + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsPosToProposal).max_sample(), + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusProposalToOrdered).max_sample(), + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusOrderedToCommit).max_sample(), + result.actual_duration.as_secs() + )); + } } table