diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index 8cda6b226565e8..5f0db3170468df 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -6,7 +6,9 @@ use anyhow::{format_err, Context, Result}; use aptos_config::config::{ChainHealthBackoffValues, ConsensusConfig, PipelineBackpressureValues}; use aptos_forge::{ args::TransactionTypeArg, - success_criteria::{LatencyType, StateProgressThreshold, SuccessCriteria}, + success_criteria::{ + LatencyBreakdownThreshold, LatencyType, StateProgressThreshold, SuccessCriteria, + }, system_metrics::{MetricsThreshold, SystemMetricsThreshold}, ForgeConfig, Options, *, }; @@ -41,7 +43,7 @@ use aptos_testcases::{ validator_reboot_stress_test::ValidatorRebootStressTest, CompositeNetworkTest, }; -use clap::{Parser, Subcommand}; +use clap::{Parser, Subcommand, __derive_refs::once_cell::sync::Lazy}; use futures::stream::{FuturesUnordered, StreamExt}; use rand::{rngs::ThreadRng, seq::SliceRandom, Rng}; use std::{ @@ -216,6 +218,24 @@ struct Resize { enable_haproxy: bool, } +// common metrics thresholds: +static SYSTEM_12_CORES_5GB_THRESHOLD: Lazy = Lazy::new(|| { + SystemMetricsThreshold::new( + // Check that we don't use more than 12 CPU cores for 30% of the time. + MetricsThreshold::new(12.0, 30), + // Check that we don't use more than 5 GB of memory for 30% of the time. + MetricsThreshold::new_gb(5.0, 30), + ) +}); +static SYSTEM_12_CORES_10GB_THRESHOLD: Lazy = Lazy::new(|| { + SystemMetricsThreshold::new( + // Check that we don't use more than 12 CPU cores for 30% of the time. + MetricsThreshold::new(12.0, 30), + // Check that we don't use more than 5 GB of memory for 30% of the time. + MetricsThreshold::new_gb(10.0, 30), + ) +}); + /// Make an easy to remember random namespace for your testnet fn random_namespace(dictionary: Vec, rng: &mut R) -> Result { // Pick four random words @@ -679,12 +699,7 @@ fn twin_validator_test() -> ForgeConfig { SuccessCriteria::new(5500) .add_no_restarts() .add_wait_for_catchup_s(60) - .add_system_metrics_threshold(SystemMetricsThreshold::new( - // Check that we don't use more than 12 CPU cores for 30% of the time. - MetricsThreshold::new(12, 30), - // Check that we don't use more than 5 GB of memory for 30% of the time. - MetricsThreshold::new(5 * 1024 * 1024 * 1024, 30), - )) + .add_system_metrics_threshold(SYSTEM_12_CORES_5GB_THRESHOLD.clone()) .add_chain_progress(StateProgressThreshold { max_no_progress_secs: 10.0, max_round_gap: 4, @@ -958,9 +973,9 @@ fn graceful_overload() -> ForgeConfig { .add_wait_for_catchup_s(120) .add_system_metrics_threshold(SystemMetricsThreshold::new( // Check that we don't use more than 12 CPU cores for 30% of the time. - MetricsThreshold::new(12, 40), + MetricsThreshold::new(12.0, 40), // Check that we don't use more than 5 GB of memory for 30% of the time. - MetricsThreshold::new(5 * 1024 * 1024 * 1024, 30), + MetricsThreshold::new_gb(5.0, 30), )) .add_latency_threshold(10.0, LatencyType::P50) .add_latency_threshold(30.0, LatencyType::P90) @@ -1009,9 +1024,9 @@ fn realistic_env_graceful_overload() -> ForgeConfig { .add_system_metrics_threshold(SystemMetricsThreshold::new( // overload test uses more CPUs than others, so increase the limit // Check that we don't use more than 18 CPU cores for 30% of the time. - MetricsThreshold::new(18, 40), + MetricsThreshold::new(18.0, 40), // Check that we don't use more than 5 GB of memory for 30% of the time. - MetricsThreshold::new(5 * 1024 * 1024 * 1024, 30), + MetricsThreshold::new_gb(5.0, 30), )) .add_latency_threshold(10.0, LatencyType::P50) .add_latency_threshold(30.0, LatencyType::P90) @@ -1417,12 +1432,7 @@ fn validators_join_and_leave() -> ForgeConfig { SuccessCriteria::new(5000) .add_no_restarts() .add_wait_for_catchup_s(240) - .add_system_metrics_threshold(SystemMetricsThreshold::new( - // Check that we don't use more than 12 CPU cores for 30% of the time. - MetricsThreshold::new(12, 30), - // Check that we don't use more than 10 GB of memory for 30% of the time. - MetricsThreshold::new(10 * 1024 * 1024 * 1024, 30), - )) + .add_system_metrics_threshold(SYSTEM_12_CORES_10GB_THRESHOLD.clone()) .add_chain_progress(StateProgressThreshold { max_no_progress_secs: 10.0, max_round_gap: 4, @@ -1452,12 +1462,7 @@ fn land_blocking_test_suite(duration: Duration) -> ForgeConfig { // Give at least 60s for catchup, give 10% of the run for longer durations. (duration.as_secs() / 10).max(60), ) - .add_system_metrics_threshold(SystemMetricsThreshold::new( - // Check that we don't use more than 12 CPU cores for 30% of the time. - MetricsThreshold::new(12, 30), - // Check that we don't use more than 10 GB of memory for 30% of the time. - MetricsThreshold::new(10 * 1024 * 1024 * 1024, 30), - )) + .add_system_metrics_threshold(SYSTEM_12_CORES_10GB_THRESHOLD.clone()) .add_chain_progress(StateProgressThreshold { max_no_progress_secs: 10.0, max_round_gap: 4, @@ -1519,12 +1524,16 @@ fn realistic_env_max_load_test( ) .add_system_metrics_threshold(SystemMetricsThreshold::new( // Check that we don't use more than 12 CPU cores for 30% of the time. - MetricsThreshold::new(14, max_cpu_threshold), + MetricsThreshold::new(14.0, max_cpu_threshold), // Check that we don't use more than 10 GB of memory for 30% of the time. - MetricsThreshold::new(10 * 1024 * 1024 * 1024, 30), + MetricsThreshold::new_gb(10.0, 30), )) .add_latency_threshold(3.0, LatencyType::P50) .add_latency_threshold(5.0, LatencyType::P90) + .add_latency_breakdown_threshold(LatencyBreakdownThreshold { + block_create_to_ordered_threshold: MetricsThreshold::new(1.5, 0), + block_create_to_commit_threshold: MetricsThreshold::new(2.5, 0), + }) .add_chain_progress(StateProgressThreshold { max_no_progress_secs: 10.0, max_round_gap: 4, @@ -1580,9 +1589,9 @@ fn realistic_network_tuned_for_throughput_test() -> ForgeConfig { // Tuned for throughput uses more cores than regular tests, // as it achieves higher throughput. // Check that we don't use more than 14 CPU cores for 30% of the time. - MetricsThreshold::new(14, 30), + MetricsThreshold::new(14.0, 30), // Check that we don't use more than 10 GB of memory for 30% of the time. - MetricsThreshold::new(10 * 1024 * 1024 * 1024, 30), + MetricsThreshold::new_gb(10.0, 30), )) .add_chain_progress(StateProgressThreshold { max_no_progress_secs: 10.0, @@ -1612,12 +1621,7 @@ fn chaos_test_suite(duration: Duration) -> ForgeConfig { }, ) .add_no_restarts() - .add_system_metrics_threshold(SystemMetricsThreshold::new( - // Check that we don't use more than 12 CPU cores for 30% of the time. - MetricsThreshold::new(12, 30), - // Check that we don't use more than 5 GB of memory for 30% of the time. - MetricsThreshold::new(5 * 1024 * 1024 * 1024, 30), - )), + .add_system_metrics_threshold(SYSTEM_12_CORES_5GB_THRESHOLD.clone()), ) } @@ -1791,12 +1795,7 @@ fn quorum_store_reconfig_enable_test() -> ForgeConfig { SuccessCriteria::new(5000) .add_no_restarts() .add_wait_for_catchup_s(240) - .add_system_metrics_threshold(SystemMetricsThreshold::new( - // Check that we don't use more than 12 CPU cores for 30% of the time. - MetricsThreshold::new(12, 30), - // Check that we don't use more than 10 GB of memory for 30% of the time. - MetricsThreshold::new(10 * 1024 * 1024 * 1024, 30), - )) + .add_system_metrics_threshold(SYSTEM_12_CORES_10GB_THRESHOLD.clone()) .add_chain_progress(StateProgressThreshold { max_no_progress_secs: 10.0, max_round_gap: 4, @@ -1863,12 +1862,7 @@ fn multiregion_benchmark_test() -> ForgeConfig { // Give at least 60s for catchup, give 10% of the run for longer durations. 180, ) - .add_system_metrics_threshold(SystemMetricsThreshold::new( - // Check that we don't use more than 12 CPU cores for 30% of the time. - MetricsThreshold::new(12, 30), - // Check that we don't use more than 10 GB of memory for 30% of the time. - MetricsThreshold::new(10 * 1024 * 1024 * 1024, 30), - )) + .add_system_metrics_threshold(SYSTEM_12_CORES_10GB_THRESHOLD.clone()) .add_chain_progress(StateProgressThreshold { max_no_progress_secs: 10.0, max_round_gap: 4, diff --git a/testsuite/forge/src/backend/k8s/prometheus.rs b/testsuite/forge/src/backend/k8s/prometheus.rs index b4ac6e4cdfb75c..36bf8ce8c3c6b5 100644 --- a/testsuite/forge/src/backend/k8s/prometheus.rs +++ b/testsuite/forge/src/backend/k8s/prometheus.rs @@ -2,12 +2,23 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{create_k8s_client, K8sApi, ReadWrite, Result}; -use anyhow::bail; +use again::RetryPolicy; +use anyhow::{anyhow, bail}; use aptos_logger::info; use k8s_openapi::api::core::v1::Secret; -use prometheus_http_query::{response::PromqlResult, Client as PrometheusClient}; +use once_cell::sync::Lazy; +use prometheus_http_query::{ + response::{PromqlResult, Sample}, + Client as PrometheusClient, +}; use reqwest::{header, Client as HttpClient}; -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + +static PROMETHEUS_RETRY_POLICY: Lazy = Lazy::new(|| { + RetryPolicy::exponential(Duration::from_millis(125)) + .with_max_retries(3) + .with_jitter(true) +}); pub async fn get_prometheus_client() -> Result { // read from the environment @@ -91,7 +102,7 @@ async fn create_prometheus_client_from_environment( pub fn construct_query_with_extra_labels( query: &str, - labels_map: BTreeMap, + labels_map: &BTreeMap, ) -> String { // edit the query string to insert swarm metadata let mut new_query = query.to_string(); @@ -123,13 +134,39 @@ pub async fn query_with_metadata( query: &str, time: Option, timeout: Option, - labels_map: BTreeMap, + labels_map: &BTreeMap, ) -> Result { let new_query = construct_query_with_extra_labels(query, labels_map); - match prom_client.query(&new_query, time, timeout).await { - Ok(r) => Ok(r), - Err(e) => bail!(e), - } + let new_query_ref = &new_query; + PROMETHEUS_RETRY_POLICY + .retry(move || prom_client.query(new_query_ref, time, timeout)) + .await + .map_err(|e| anyhow!("Failed to query prometheus for {}: {}", query, e)) +} + +pub async fn query_range_with_metadata( + prom_client: &PrometheusClient, + query: &str, + start_time: i64, + end_time: i64, + internal_secs: f64, + timeout: Option, + labels_map: &BTreeMap, +) -> Result> { + let new_query = construct_query_with_extra_labels(query, labels_map); + let new_query_ref = &new_query; + let r = PROMETHEUS_RETRY_POLICY + .retry(move || { + prom_client.query_range(new_query_ref, start_time, end_time, internal_secs, timeout) + }) + .await + .map_err(|e| anyhow!("Failed to query prometheus for {}: {}", query, e))?; + Ok(r.as_range() + .ok_or_else(|| anyhow!("Failed to get range from prometheus response"))? + .first() + .ok_or_else(|| anyhow!("Empty range vector returned from prometheus"))? + .samples() + .to_vec()) } #[cfg(test)] @@ -259,7 +296,7 @@ mod tests { labels_map.insert("a".to_string(), "a".to_string()); labels_map.insert("some_label".to_string(), "blabla".to_string()); let expected_query = r#"aptos_connections{a="a",some_label="blabla"}"#; - let new_query = construct_query_with_extra_labels(original_query, labels_map); + let new_query = construct_query_with_extra_labels(original_query, &labels_map); assert_eq!(expected_query, new_query); // test when existing labels @@ -268,7 +305,7 @@ mod tests { labels_map.insert("a".to_string(), "a".to_string()); labels_map.insert("some_label".to_string(), "blabla".to_string()); let expected_query = r#"aptos_connections{a="a",some_label="blabla",abc="123",def="456"}"#; - let new_query = construct_query_with_extra_labels(original_query, labels_map); + let new_query = construct_query_with_extra_labels(original_query, &labels_map); assert_eq!(expected_query, new_query); } } diff --git a/testsuite/forge/src/backend/k8s/swarm.rs b/testsuite/forge/src/backend/k8s/swarm.rs index 62b5c6968f80ae..19a681e98a41ea 100644 --- a/testsuite/forge/src/backend/k8s/swarm.rs +++ b/testsuite/forge/src/backend/k8s/swarm.rs @@ -7,7 +7,7 @@ use crate::{ get_free_port, get_stateful_set_image, install_public_fullnode, interface::system_metrics::{query_prometheus_system_metrics, SystemMetricsThreshold}, node::K8sNode, - prometheus::{self, query_with_metadata}, + prometheus::{self, query_range_with_metadata, query_with_metadata}, query_sequence_number, set_stateful_set_image_tag, uninstall_testnet_resources, ChainInfo, FullNode, K8sApi, Node, Result, Swarm, SwarmChaos, Validator, Version, HAPROXY_SERVICE_SUFFIX, REST_API_HAPROXY_SERVICE_PORT, REST_API_SERVICE_PORT, @@ -29,7 +29,10 @@ use kube::{ api::{Api, ListParams}, client::Client as K8sClient, }; -use prometheus_http_query::{response::PromqlResult, Client as PrometheusClient}; +use prometheus_http_query::{ + response::{PromqlResult, Sample}, + Client as PrometheusClient, +}; use regex::Regex; use std::{ collections::{BTreeMap, HashMap, HashSet}, @@ -402,7 +405,31 @@ impl Swarm for K8sSwarm { if let Some(c) = &self.prom_client { let mut labels_map = BTreeMap::new(); labels_map.insert("namespace".to_string(), self.kube_namespace.clone()); - return query_with_metadata(c, query, time, timeout, labels_map).await; + return query_with_metadata(c, query, time, timeout, &labels_map).await; + } + bail!("No prom client"); + } + + async fn query_range_metrics( + &self, + query: &str, + start_time: i64, + end_time: i64, + timeout: Option, + ) -> Result> { + if let Some(c) = &self.prom_client { + let mut labels_map = BTreeMap::new(); + labels_map.insert("namespace".to_string(), self.kube_namespace.clone()); + return query_range_with_metadata( + c, + query, + start_time, + end_time, + 30.0, + timeout, + &labels_map, + ) + .await; } bail!("No prom client"); } diff --git a/testsuite/forge/src/backend/local/swarm.rs b/testsuite/forge/src/backend/local/swarm.rs index ca387da8e9ae13..2da39fbb3b67ac 100644 --- a/testsuite/forge/src/backend/local/swarm.rs +++ b/testsuite/forge/src/backend/local/swarm.rs @@ -24,7 +24,7 @@ use aptos_sdk::{ PeerId, }, }; -use prometheus_http_query::response::PromqlResult; +use prometheus_http_query::response::{PromqlResult, Sample}; use std::{ collections::HashMap, fs, @@ -628,6 +628,16 @@ impl Swarm for LocalSwarm { todo!() } + async fn query_range_metrics( + &self, + _query: &str, + _start_time: i64, + _end_time: i64, + _timeout: Option, + ) -> Result> { + todo!() + } + async fn ensure_healthy_system_metrics( &mut self, _start_time: i64, diff --git a/testsuite/forge/src/interface/network.rs b/testsuite/forge/src/interface/network.rs index 243a34995e9810..65ddc603db6748 100644 --- a/testsuite/forge/src/interface/network.rs +++ b/testsuite/forge/src/interface/network.rs @@ -4,7 +4,7 @@ use super::Test; use crate::{ - success_criteria::{SuccessCriteria, SuccessCriteriaChecker}, + success_criteria::{LatencyBreakdown, SuccessCriteria, SuccessCriteriaChecker}, CoreContext, Result, Swarm, TestReport, }; use aptos_transaction_emitter_lib::{EmitJobRequest, TxnStats}; @@ -61,6 +61,7 @@ impl<'t> NetworkContext<'t> { &mut self, stats: &TxnStats, window: Duration, + latency_breakdown: &LatencyBreakdown, start_time: i64, end_time: i64, start_version: u64, @@ -73,6 +74,7 @@ impl<'t> NetworkContext<'t> { self.report, stats, window, + latency_breakdown, start_time, end_time, start_version, diff --git a/testsuite/forge/src/interface/swarm.rs b/testsuite/forge/src/interface/swarm.rs index ed9d2258c2e5d5..08b24fbbc95180 100644 --- a/testsuite/forge/src/interface/swarm.rs +++ b/testsuite/forge/src/interface/swarm.rs @@ -12,7 +12,7 @@ use aptos_logger::info; use aptos_rest_client::Client as RestClient; use aptos_sdk::types::PeerId; use futures::future::{join_all, try_join_all}; -use prometheus_http_query::response::PromqlResult; +use prometheus_http_query::response::{PromqlResult, Sample}; use std::time::{Duration, Instant}; use tokio::runtime::Runtime; @@ -100,6 +100,14 @@ pub trait Swarm: Sync { timeout: Option, ) -> Result; + async fn query_range_metrics( + &self, + query: &str, + start_time: i64, + end_time: i64, + timeout: Option, + ) -> Result>; + fn aptos_public_info(&mut self) -> AptosPublicInfo<'_> { self.chain_info().into_aptos_public_info() } diff --git a/testsuite/forge/src/interface/system_metrics.rs b/testsuite/forge/src/interface/system_metrics.rs index 349ac64001d86a..2d01fcdcdaa04d 100644 --- a/testsuite/forge/src/interface/system_metrics.rs +++ b/testsuite/forge/src/interface/system_metrics.rs @@ -15,14 +15,9 @@ pub struct SystemMetrics { memory_bytes_metrics: Vec, } -// This retry policy is used for important client calls necessary for setting -// up the test (e.g. account creation) and collecting its results (e.g. checking -// account sequence numbers). If these fail, the whole test fails. We do not use -// this for submitting transactions, as we have a way to handle when that fails. -// This retry policy means an operation will take 8 seconds at most. static RETRY_POLICY: Lazy = Lazy::new(|| { RetryPolicy::exponential(Duration::from_millis(125)) - .with_max_retries(6) + .with_max_retries(3) .with_jitter(true) }); @@ -37,18 +32,25 @@ impl SystemMetrics { #[derive(Default, Clone, Debug, Serialize)] pub struct MetricsThreshold { - max: usize, + max: f64, // % of the data point that can breach the max threshold max_breach_pct: usize, } impl MetricsThreshold { - pub fn new(max: usize, max_breach_pct: usize) -> Self { + pub fn new(max: f64, max_breach_pct: usize) -> Self { Self { max, max_breach_pct, } } + + pub fn new_gb(max: f64, max_breach_pct: usize) -> Self { + Self { + max: max * 1024.0 * 1024.0 * 1024.0, + max_breach_pct, + } + } } #[derive(Default, Clone, Debug, Serialize)] @@ -76,7 +78,7 @@ impl SystemMetricsThreshold { } } -fn ensure_metrics_threshold( +pub fn ensure_metrics_threshold( metrics_name: &str, threshold: &MetricsThreshold, metrics: &Vec, @@ -86,7 +88,7 @@ fn ensure_metrics_threshold( } let breach_count = metrics .iter() - .filter(|sample| sample.value() > threshold.max as f64) + .filter(|sample| sample.value() > threshold.max) .count(); let breach_pct = (breach_count * 100) / metrics.len(); if breach_pct > threshold.max_breach_pct { @@ -136,7 +138,7 @@ async fn get_prometheus_range_metrics( labels_map.insert("namespace".to_string(), namespace.to_string()); let response = client .query_range( - construct_query_with_extra_labels(query, labels_map), + construct_query_with_extra_labels(query, &labels_map), start_time, end_time, internal_secs, @@ -191,8 +193,8 @@ mod tests { use super::*; #[tokio::test] async fn test_empty_metrics_threshold() { - let cpu_threshold = MetricsThreshold::new(10, 30); - let memory_threshold = MetricsThreshold::new(100, 40); + let cpu_threshold = MetricsThreshold::new(10.0, 30); + let memory_threshold = MetricsThreshold::new(100.0, 40); let threshold = SystemMetricsThreshold::new(cpu_threshold, memory_threshold); let metrics = SystemMetrics::new(vec![], vec![]); threshold.ensure_threshold(&metrics).unwrap_err(); diff --git a/testsuite/forge/src/runner.rs b/testsuite/forge/src/runner.rs index 5bbec179491217..52a9e19c30b357 100644 --- a/testsuite/forge/src/runner.rs +++ b/testsuite/forge/src/runner.rs @@ -340,9 +340,9 @@ impl Default for ForgeConfig { .add_no_restarts() .add_system_metrics_threshold(SystemMetricsThreshold::new( // Check that we don't use more than 12 CPU cores for 30% of the time. - MetricsThreshold::new(12, 30), + MetricsThreshold::new(12.0, 30), // Check that we don't use more than 10 GB of memory for 30% of the time. - MetricsThreshold::new(10 * 1024 * 1024 * 1024, 30), + MetricsThreshold::new_gb(10.0, 30), )) }; Self { diff --git a/testsuite/forge/src/success_criteria.rs b/testsuite/forge/src/success_criteria.rs index 51ce299e180d4a..c59074ae079cd3 100644 --- a/testsuite/forge/src/success_criteria.rs +++ b/testsuite/forge/src/success_criteria.rs @@ -1,11 +1,15 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{system_metrics::SystemMetricsThreshold, Swarm, SwarmExt, TestReport}; +use crate::{ + system_metrics::{ensure_metrics_threshold, MetricsThreshold, SystemMetricsThreshold}, + Swarm, SwarmExt, TestReport, +}; use anyhow::{bail, Context}; use aptos::node::analyze::fetch_metadata::FetchMetadata; use aptos_sdk::types::PeerId; use aptos_transaction_emitter_lib::{TxnStats, TxnStatsRate}; +use prometheus_http_query::response::Sample; use std::time::Duration; #[derive(Clone, Debug)] @@ -26,6 +30,7 @@ pub enum LatencyType { pub struct SuccessCriteria { pub min_avg_tps: usize, latency_thresholds: Vec<(Duration, LatencyType)>, + latency_breakdown_thresholds: Option, check_no_restarts: bool, max_expired_tps: Option, max_failed_submission_tps: Option, @@ -40,6 +45,7 @@ impl SuccessCriteria { Self { min_avg_tps, latency_thresholds: Vec::new(), + latency_breakdown_thresholds: None, check_no_restarts: false, max_expired_tps: None, max_failed_submission_tps: None, @@ -84,6 +90,39 @@ impl SuccessCriteria { .push((Duration::from_secs_f32(threshold_s), latency_type)); self } + + pub fn add_latency_breakdown_threshold(mut self, threshold: LatencyBreakdownThreshold) -> Self { + self.latency_breakdown_thresholds = Some(threshold); + self + } +} + +#[derive(Default, Clone, Debug)] +pub struct LatencyBreakdown { + pub block_create_to_ordered_samples: Vec, + pub block_create_to_commit_samples: Vec, +} + +#[derive(Default, Clone, Debug)] +pub struct LatencyBreakdownThreshold { + pub block_create_to_ordered_threshold: MetricsThreshold, + pub block_create_to_commit_threshold: MetricsThreshold, +} + +impl LatencyBreakdownThreshold { + pub fn ensure_threshold(&self, metrics: &LatencyBreakdown) -> anyhow::Result<()> { + ensure_metrics_threshold( + "block_create_to_ordered", + &self.block_create_to_ordered_threshold, + &metrics.block_create_to_ordered_samples, + )?; + ensure_metrics_threshold( + "block_create_to_commit", + &self.block_create_to_commit_threshold, + &metrics.block_create_to_commit_samples, + )?; + Ok(()) + } } pub struct SuccessCriteriaChecker {} @@ -93,6 +132,7 @@ impl SuccessCriteriaChecker { success_criteria: &SuccessCriteria, _report: &mut TestReport, stats_rate: &TxnStatsRate, + latency_breakdown: Option<&LatencyBreakdown>, traffic_name: Option, ) -> anyhow::Result<()> { let traffic_name_addition = traffic_name @@ -110,6 +150,9 @@ impl SuccessCriteriaChecker { stats_rate, &traffic_name_addition, )?; + if let Some(latency_breakdown_thresholds) = &success_criteria.latency_breakdown_thresholds { + latency_breakdown_thresholds.ensure_threshold(latency_breakdown.unwrap())?; + } Ok(()) } @@ -119,6 +162,7 @@ impl SuccessCriteriaChecker { report: &mut TestReport, stats: &TxnStats, window: Duration, + latency_breakdown: &LatencyBreakdown, start_time: i64, end_time: i64, start_version: u64, @@ -139,6 +183,16 @@ impl SuccessCriteriaChecker { &"".to_string(), )?; + Self::check_latency( + &success_criteria.latency_thresholds, + &stats_rate, + &"".to_string(), + )?; + + if let Some(latency_breakdown_thresholds) = &success_criteria.latency_breakdown_thresholds { + latency_breakdown_thresholds.ensure_threshold(latency_breakdown)?; + } + if let Some(timeout) = success_criteria.wait_for_all_nodes_to_catchup { swarm .wait_for_all_nodes_to_catchup_to_next(timeout) diff --git a/testsuite/testcases/src/lib.rs b/testsuite/testcases/src/lib.rs index e41fe877af2122..9c78a88a94254c 100644 --- a/testsuite/testcases/src/lib.rs +++ b/testsuite/testcases/src/lib.rs @@ -27,10 +27,11 @@ pub mod validator_reboot_stress_test; use anyhow::Context; use aptos_forge::{ - EmitJobRequest, NetworkContext, NetworkTest, NodeExt, Result, Swarm, SwarmExt, Test, - TestReport, TxnEmitter, TxnStats, Version, + success_criteria::LatencyBreakdown, EmitJobRequest, NetworkContext, NetworkTest, NodeExt, + Result, Swarm, SwarmExt, Test, TestReport, TxnEmitter, TxnStats, Version, }; use aptos_logger::info; +use aptos_rest_client::Client as RestClient; use aptos_sdk::{transaction_builder::TransactionFactory, types::PeerId}; use futures::future::join_all; use rand::{rngs::StdRng, SeedableRng}; @@ -171,17 +172,26 @@ impl NetworkTest for dyn NetworkLoadTest { let emit_job_request = ctx.emit_job.clone(); let rng = SeedableRng::from_rng(ctx.core().rng())?; let duration = ctx.global_duration; - let (txn_stat, actual_test_duration, _ledger_transactions, _stats_by_phase) = self - .network_load_test( - ctx, - emit_job_request, - duration, - WARMUP_DURATION_FRACTION, - COOLDOWN_DURATION_FRACTION, - rng, - )?; - ctx.report - .report_txn_stats(self.name().to_string(), &txn_stat); + let stats_by_phase = self.network_load_test( + ctx, + emit_job_request, + duration, + WARMUP_DURATION_FRACTION, + COOLDOWN_DURATION_FRACTION, + rng, + )?; + + let phased = stats_by_phase.len() > 1; + for (phase, phase_stats) in stats_by_phase.iter().enumerate() { + ctx.report.report_txn_stats( + if phased { + format!("{}_phase_{}", self.name(), phase) + } else { + self.name().to_string() + }, + &phase_stats.emitter_stats, + ); + } let end_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -194,15 +204,18 @@ impl NetworkTest for dyn NetworkLoadTest { self.finish(ctx.swarm()) .context("finish NetworkLoadTest ")?; - ctx.check_for_success( - &txn_stat, - actual_test_duration, - start_timestamp as i64, - end_timestamp as i64, - start_version, - end_version, - ) - .context("check for success")?; + for (_phase, phase_stats) in stats_by_phase.into_iter().enumerate() { + ctx.check_for_success( + &phase_stats.emitter_stats, + phase_stats.actual_duration, + &phase_stats.latency_breakdown, + start_timestamp as i64, + end_timestamp as i64, + start_version, + end_version, + ) + .context("check for success")?; + } Ok(()) } @@ -217,7 +230,7 @@ impl dyn NetworkLoadTest { warmup_duration_fraction: f32, cooldown_duration_fraction: f32, rng: StdRng, - ) -> Result<(TxnStats, Duration, u64, Vec<(TxnStats, Duration)>)> { + ) -> Result> { let destination = self.setup(ctx).context("setup NetworkLoadTest")?; let nodes_to_send_load_to = destination.get_destination_nodes(ctx.swarm()); @@ -255,19 +268,11 @@ impl dyn NetworkLoadTest { job = rt.block_on(job.periodic_stat_forward(warmup_duration, 60)); info!("{}s warmup finished", warmup_duration.as_secs()); - let max_start_ledger_transactions = rt - .block_on(join_all( - clients.iter().map(|client| client.get_ledger_information()), - )) - .into_iter() - .filter(|r| r.is_ok()) - .map(|r| r.unwrap().into_inner()) - .map(|s| s.version - 2 * s.block_height) - .max(); - - let mut actual_phase_durations = Vec::new(); + let mut phase_timing = Vec::new(); + let mut phase_start_network_state = Vec::new(); let test_start = Instant::now(); for i in 0..stats_tracking_phases - 2 { + phase_start_network_state.push(rt.block_on(NetworkState::new(&clients))); job.start_next_phase(); if i > 0 { @@ -277,13 +282,13 @@ impl dyn NetworkLoadTest { stats_tracking_phases - 2, ); } - let phase_start = Instant::now(); + let phase_start = PhaseTimingStart::now(); let join_stats = rt.spawn(job.periodic_stat_forward(phase_duration, 60)); self.test(ctx.swarm, ctx.report, phase_duration) .context("test NetworkLoadTest")?; job = rt.block_on(join_stats).context("join stats")?; - actual_phase_durations.push(phase_start.elapsed()); + phase_timing.push(phase_start.elapsed()); } let actual_test_duration = test_start.elapsed(); info!( @@ -292,17 +297,9 @@ impl dyn NetworkLoadTest { actual_test_duration.as_secs() ); + phase_start_network_state.push(rt.block_on(NetworkState::new(&clients))); job.start_next_phase(); let cooldown_start = Instant::now(); - let max_end_ledger_transactions = rt - .block_on(join_all( - clients.iter().map(|client| client.get_ledger_information()), - )) - .into_iter() - .filter(|r| r.is_ok()) - .map(|r| r.unwrap().into_inner()) - .map(|s| s.version - 2 * s.block_height) - .max(); let cooldown_used = cooldown_start.elapsed(); if cooldown_used < cooldown_duration { @@ -320,38 +317,144 @@ impl dyn NetworkLoadTest { info!("Warmup stats: {}", stats_by_phase[0].rate()); let mut stats: Option = None; - let mut stats_and_duration_by_phase_filtered = Vec::new(); + let mut stats_by_phase_filtered = Vec::new(); for i in 0..stats_tracking_phases - 2 { - let cur = &stats_by_phase[1 + i]; + let next_i = i + 1; + let cur = &stats_by_phase[next_i]; info!("Test stats [test phase {}]: {}", i, cur.rate()); stats = if let Some(previous) = stats { Some(&previous + cur) } else { Some(cur.clone()) }; - stats_and_duration_by_phase_filtered.push((cur.clone(), actual_phase_durations[i])); + let latency_breakdown = rt.block_on(fetch_latency_breakdown( + ctx.swarm(), + phase_timing[i].start_unixtime_s, + phase_timing[i].end_unixtime_s, + ))?; + info!("latency_breakdown: {:?}", latency_breakdown); + stats_by_phase_filtered.push(LoadTestPhaseStats { + emitter_stats: cur.clone(), + actual_duration: phase_timing[i].duration, + phase_start_unixtime_s: phase_timing[i].start_unixtime_s, + phase_end_unixtime_s: phase_timing[i].end_unixtime_s, + ledger_transactions: NetworkState::ledger_transactions( + &phase_start_network_state[i], + &phase_start_network_state[next_i], + ), + latency_breakdown, + }); } info!("Cooldown stats: {}", stats_by_phase.last().unwrap().rate()); - let ledger_transactions = if let Some(end_t) = max_end_ledger_transactions { - if let Some(start_t) = max_start_ledger_transactions { - end_t - start_t - } else { - 0 - } + Ok(stats_by_phase_filtered) + } +} + +struct PhaseTimingStart { + now: Instant, + unixtime_s: u64, +} + +impl PhaseTimingStart { + fn now() -> PhaseTimingStart { + let now = Instant::now(); + let unixtime_s = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + PhaseTimingStart { now, unixtime_s } + } + + fn elapsed(&self) -> PhaseTiming { + PhaseTiming { + duration: self.now.elapsed(), + start_unixtime_s: self.unixtime_s, + end_unixtime_s: SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(), + } + } +} + +struct PhaseTiming { + duration: Duration, + start_unixtime_s: u64, + end_unixtime_s: u64, +} + +pub(crate) struct NetworkState { + max_version_and_height: Option<(u64, u64)>, +} + +impl NetworkState { + pub async fn new(clients: &[RestClient]) -> NetworkState { + let max_version_and_height = + join_all(clients.iter().map(|client| client.get_ledger_information())) + .await + .into_iter() + .filter(|r| r.is_ok()) + .map(|r| r.unwrap().into_inner()) + .map(|s| (s.version, s.block_height)) + .max(); + NetworkState { + max_version_and_height, + } + } + + pub fn ledger_transactions(start: &NetworkState, end: &NetworkState) -> u64 { + if let (Some((end_version, end_height)), Some((start_version, start_height))) = + (end.max_version_and_height, start.max_version_and_height) + { + (end_version - end_height * 2) - (start_version - start_height * 2) } else { 0 - }; - - Ok(( - stats.unwrap(), - actual_test_duration, - ledger_transactions, - stats_and_duration_by_phase_filtered, - )) + } } } +async fn fetch_latency_breakdown( + swarm: &dyn Swarm, + start_time: u64, + end_time: u64, +) -> anyhow::Result { + let block_create_to_ordered_query = r#"quantile(0.67, rate(aptos_consensus_block_tracing_sum{container=~"validator", stage="ordered"}[30s]) / rate(aptos_consensus_block_tracing_count{container=~"validator", stage="ordered"}[30s]))"#; + let block_create_to_commit_query = r#"quantile(0.67, rate(aptos_consensus_block_tracing_sum{container=~"validator", stage="committed"}[30s]) / rate(aptos_consensus_block_tracing_count{container=~"validator", stage="committed"}[30s]))"#; + + let block_create_to_ordered_samples = swarm + .query_range_metrics( + block_create_to_ordered_query, + start_time as i64, + end_time as i64, + None, + ) + .await?; + + let block_create_to_commit_samples = swarm + .query_range_metrics( + block_create_to_commit_query, + start_time as i64, + end_time as i64, + None, + ) + .await?; + + Ok(LatencyBreakdown { + block_create_to_ordered_samples, + block_create_to_commit_samples, + }) +} + +pub struct LoadTestPhaseStats { + pub emitter_stats: TxnStats, + pub actual_duration: Duration, + pub phase_start_unixtime_s: u64, + pub phase_end_unixtime_s: u64, + pub ledger_transactions: u64, + pub latency_breakdown: LatencyBreakdown, +} + pub struct CompositeNetworkTest { // Wrapper tests - their setup and finish methods are called, before the test ones. // TODO don't know how to make this array, and have forge/main.rs work diff --git a/testsuite/testcases/src/load_vs_perf_benchmark.rs b/testsuite/testcases/src/load_vs_perf_benchmark.rs index 4105d838a4809c..4d3f8e29aa9fb6 100644 --- a/testsuite/testcases/src/load_vs_perf_benchmark.rs +++ b/testsuite/testcases/src/load_vs_perf_benchmark.rs @@ -4,7 +4,7 @@ use crate::NetworkLoadTest; use aptos_forge::{ args::TransactionTypeArg, - success_criteria::{SuccessCriteria, SuccessCriteriaChecker}, + success_criteria::{LatencyBreakdown, SuccessCriteria, SuccessCriteriaChecker}, EmitJobMode, EmitJobRequest, NetworkContext, NetworkTest, Result, Test, TxnStats, }; use aptos_logger::info; @@ -18,6 +18,7 @@ use tokio::runtime::Runtime; pub struct SingleRunStats { name: String, stats: TxnStats, + latency_breakdown: LatencyBreakdown, ledger_transactions: u64, actual_duration: Duration, } @@ -106,34 +107,31 @@ impl LoadVsPerfBenchmark { ) -> Result> { let rng = SeedableRng::from_rng(ctx.core().rng())?; let emit_job_request = workloads.configure(index, ctx.emit_job.clone()); - let (stats, actual_duration, ledger_transactions, stats_by_phase) = - self.test.network_load_test( - ctx, - emit_job_request, - duration, - // add larger warmup, as when we are exceeding the max load, - // it takes more time to fill mempool. - 0.2, - 0.05, - rng, - )?; - - let mut result = vec![SingleRunStats { - name: workloads.name(index), - stats, - ledger_transactions, - actual_duration, - }]; - - if stats_by_phase.len() > 1 { - for (i, (phase_stats, phase_duration)) in stats_by_phase.into_iter().enumerate() { - result.push(SingleRunStats { - name: format!("{}_phase_{}", workloads.name(index), i), - stats: phase_stats, - ledger_transactions, - actual_duration: phase_duration, - }); - } + let stats_by_phase = self.test.network_load_test( + ctx, + emit_job_request, + duration, + // add larger warmup, as when we are exceeding the max load, + // it takes more time to fill mempool. + 0.2, + 0.05, + rng, + )?; + + let mut result = vec![]; + let phased = stats_by_phase.len() > 1; + for (phase, phase_stats) in stats_by_phase.into_iter().enumerate() { + result.push(SingleRunStats { + name: if phased { + format!("{}_phase_{}", workloads.name(index), phase) + } else { + workloads.name(index) + }, + stats: phase_stats.emitter_stats, + latency_breakdown: phase_stats.latency_breakdown, + ledger_transactions: phase_stats.ledger_transactions, + actual_duration: phase_stats.actual_duration, + }); } Ok(result) @@ -193,6 +191,7 @@ impl NetworkTest for LoadVsPerfBenchmark { criteria, ctx.report, &rate, + Some(&result.latency_breakdown), Some(result.name.clone()), )?; } diff --git a/testsuite/testcases/src/two_traffics_test.rs b/testsuite/testcases/src/two_traffics_test.rs index 846099b3134f89..dd824174a4ee9a 100644 --- a/testsuite/testcases/src/two_traffics_test.rs +++ b/testsuite/testcases/src/two_traffics_test.rs @@ -70,6 +70,7 @@ impl NetworkLoadTest for TwoTrafficsTest { &self.inner_success_criteria, report, &rate, + None, Some("inner traffic".to_string()), )?; Ok(())