From e936f926ed4ab16ace38c47ab823dedb976b4542 Mon Sep 17 00:00:00 2001 From: Igor Date: Tue, 1 Aug 2023 11:40:22 -0700 Subject: [PATCH] refactoring to cleanup system metrics too --- testsuite/forge-cli/src/main.rs | 4 +- testsuite/forge/src/backend/k8s/prometheus.rs | 16 +- testsuite/forge/src/backend/k8s/swarm.rs | 24 -- testsuite/forge/src/backend/local/swarm.rs | 13 +- testsuite/forge/src/interface/mod.rs | 2 +- testsuite/forge/src/interface/network.rs | 3 +- .../forge/src/interface/prometheus_metrics.rs | 184 +++++++++++++ testsuite/forge/src/interface/swarm.rs | 10 +- .../forge/src/interface/system_metrics.rs | 202 -------------- testsuite/forge/src/runner.rs | 5 +- testsuite/forge/src/success_criteria.rs | 247 ++++++++++-------- testsuite/testcases/src/lib.rs | 80 +----- .../testcases/src/load_vs_perf_benchmark.rs | 13 +- 13 files changed, 357 insertions(+), 446 deletions(-) create mode 100644 testsuite/forge/src/interface/prometheus_metrics.rs delete mode 100644 testsuite/forge/src/interface/system_metrics.rs diff --git a/testsuite/forge-cli/src/main.rs b/testsuite/forge-cli/src/main.rs index b3c297f9f43f7..00c4834652020 100644 --- a/testsuite/forge-cli/src/main.rs +++ b/testsuite/forge-cli/src/main.rs @@ -7,9 +7,9 @@ use aptos_config::config::{ChainHealthBackoffValues, ConsensusConfig, PipelineBa use aptos_forge::{ args::TransactionTypeArg, success_criteria::{ - LatencyBreakdownThreshold, LatencyType, StateProgressThreshold, SuccessCriteria, + LatencyBreakdownThreshold, LatencyType, MetricsThreshold, StateProgressThreshold, + SuccessCriteria, SystemMetricsThreshold, }, - system_metrics::{MetricsThreshold, SystemMetricsThreshold}, ForgeConfig, Options, *, }; use aptos_logger::{info, Level}; diff --git a/testsuite/forge/src/backend/k8s/prometheus.rs b/testsuite/forge/src/backend/k8s/prometheus.rs index 9e11d8a11e456..f59f6810888d7 100644 --- a/testsuite/forge/src/backend/k8s/prometheus.rs +++ b/testsuite/forge/src/backend/k8s/prometheus.rs @@ -169,7 +169,7 @@ pub async fn query_range_with_metadata( new_query ) })?; - Ok(r.as_range() + let range = r.as_range() .ok_or_else(|| { anyhow!( "Failed to get range from prometheus response. start={}, end={}, query={}", @@ -177,7 +177,19 @@ pub async fn query_range_with_metadata( end_time, new_query ) - })? + })?; + info!("For Query {} got range {:?}", new_query, range); + if range.len() != 1 { + bail!( + "Expected only one range vector from prometheus, recieved {} ({:?}). start={}, end={}, query={}", + range.len(), + range, + start_time, + end_time, + new_query + ); + } + Ok(range .first() .ok_or_else(|| { anyhow!( diff --git a/testsuite/forge/src/backend/k8s/swarm.rs b/testsuite/forge/src/backend/k8s/swarm.rs index 19a681e98a41e..fb6b48e33ba4f 100644 --- a/testsuite/forge/src/backend/k8s/swarm.rs +++ b/testsuite/forge/src/backend/k8s/swarm.rs @@ -5,7 +5,6 @@ use crate::{ check_for_container_restart, create_k8s_client, delete_all_chaos, get_default_pfn_node_config, get_free_port, get_stateful_set_image, install_public_fullnode, - interface::system_metrics::{query_prometheus_system_metrics, SystemMetricsThreshold}, node::K8sNode, prometheus::{self, query_range_with_metadata, query_with_metadata}, query_sequence_number, set_stateful_set_image_tag, uninstall_testnet_resources, ChainInfo, @@ -434,29 +433,6 @@ impl Swarm for K8sSwarm { bail!("No prom client"); } - async fn ensure_healthy_system_metrics( - &mut self, - start_time: i64, - end_time: i64, - threshold: SystemMetricsThreshold, - ) -> Result<()> { - if let Some(c) = &self.prom_client { - let system_metrics = query_prometheus_system_metrics( - c, - start_time, - end_time, - 30.0, - &self.kube_namespace, - ) - .await?; - threshold.ensure_threshold(&system_metrics)?; - info!("System metrics are healthy"); - Ok(()) - } else { - bail!("No prom client"); - } - } - fn chain_info_for_node(&mut self, idx: usize) -> ChainInfo<'_> { let rest_api_url = self.get_rest_api_url(idx); let inspection_service_url = self.get_inspection_service_url(idx); diff --git a/testsuite/forge/src/backend/local/swarm.rs b/testsuite/forge/src/backend/local/swarm.rs index 2da39fbb3b67a..c0134e8faf890 100644 --- a/testsuite/forge/src/backend/local/swarm.rs +++ b/testsuite/forge/src/backend/local/swarm.rs @@ -3,8 +3,8 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - interface::system_metrics::SystemMetricsThreshold, ChainInfo, FullNode, HealthCheckError, - LocalNode, LocalVersion, Node, Swarm, SwarmChaos, SwarmExt, Validator, Version, + ChainInfo, FullNode, HealthCheckError, LocalNode, LocalVersion, Node, Swarm, SwarmChaos, + SwarmExt, Validator, Version, }; use anyhow::{anyhow, bail, Result}; use aptos::common::types::EncodingType; @@ -638,15 +638,6 @@ impl Swarm for LocalSwarm { todo!() } - async fn ensure_healthy_system_metrics( - &mut self, - _start_time: i64, - _end_time: i64, - _threshold: SystemMetricsThreshold, - ) -> Result<()> { - todo!() - } - fn chain_info_for_node(&mut self, idx: usize) -> ChainInfo<'_> { let rest_api_url = self .validators() diff --git a/testsuite/forge/src/interface/mod.rs b/testsuite/forge/src/interface/mod.rs index 2e1e368ec222f..5a6507219e0ef 100644 --- a/testsuite/forge/src/interface/mod.rs +++ b/testsuite/forge/src/interface/mod.rs @@ -19,7 +19,7 @@ pub use chaos::*; mod node; pub use node::*; mod chain_info; -pub mod system_metrics; +pub mod prometheus_metrics; use aptos_framework::ReleaseBundle; pub use chain_info::*; diff --git a/testsuite/forge/src/interface/network.rs b/testsuite/forge/src/interface/network.rs index 65ddc603db674..1d4f87fc2a9f4 100644 --- a/testsuite/forge/src/interface/network.rs +++ b/testsuite/forge/src/interface/network.rs @@ -4,7 +4,8 @@ use super::Test; use crate::{ - success_criteria::{LatencyBreakdown, SuccessCriteria, SuccessCriteriaChecker}, + prometheus_metrics::LatencyBreakdown, + success_criteria::{SuccessCriteria, SuccessCriteriaChecker}, CoreContext, Result, Swarm, TestReport, }; use aptos_transaction_emitter_lib::{EmitJobRequest, TxnStats}; diff --git a/testsuite/forge/src/interface/prometheus_metrics.rs b/testsuite/forge/src/interface/prometheus_metrics.rs new file mode 100644 index 0000000000000..85333bca2993f --- /dev/null +++ b/testsuite/forge/src/interface/prometheus_metrics.rs @@ -0,0 +1,184 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::Swarm; +use prometheus_http_query::response::Sample; +use std::{collections::BTreeMap, fmt}; + +#[derive(Clone)] +pub struct MetricSamples(Vec); + +impl MetricSamples { + pub fn new(samples: Vec) -> Self { + Self(samples) + } + + pub fn max_sample(&self) -> f64 { + self.0 + .iter() + .map(|s| s.value()) + .max_by(|a, b| a.partial_cmp(b).unwrap()) + .unwrap_or_default() + } + + pub fn get(&self) -> &Vec { + &self.0 + } +} + +impl fmt::Debug for MetricSamples { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{:?}", + self.0 + .iter() + .map(|s| (s.value(), s.timestamp())) + .collect::>() + ) + } +} + +#[derive(Clone, Debug)] +pub struct SystemMetrics { + pub cpu_core_metrics: MetricSamples, + pub memory_bytes_metrics: MetricSamples, +} + +impl SystemMetrics { + pub fn new(cpu_metrics: Vec, memory_metrics: Vec) -> Self { + Self { + cpu_core_metrics: MetricSamples::new(cpu_metrics), + memory_bytes_metrics: MetricSamples::new(memory_metrics), + } + } +} + +pub async fn fetch_system_metrics( + swarm: &dyn Swarm, + start_time: i64, + end_time: i64, +) -> anyhow::Result { + let cpu_query = r#"avg(rate(container_cpu_usage_seconds_total{container=~"validator"}[30s]))"#; + let memory_query = r#"avg(container_memory_rss{container=~"validator"})"#; + + let cpu_samples = swarm + .query_range_metrics(cpu_query, start_time, end_time, None) + .await?; + + let memory_samples = swarm + .query_range_metrics(memory_query, start_time, end_time, None) + .await?; + + Ok(SystemMetrics::new(cpu_samples, memory_samples)) +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] +pub enum LatencyBreakdownSlice { + QsBatchToPos, + QsPosToProposal, + ConsensusProposalToOrdered, + ConsensusOrderedToCommit, + ConsensusProposalToCommit, +} + +#[derive(Clone, Debug)] +pub struct LatencyBreakdown(BTreeMap); + +impl LatencyBreakdown { + pub fn new(latency: BTreeMap) -> Self { + Self(latency) + } + + pub fn get_samples(&self, slice: &LatencyBreakdownSlice) -> &MetricSamples { + self.0 + .get(slice) + .unwrap_or_else(|| panic!("Missing latency breakdown for {:?}", slice)) + } +} + +pub async fn fetch_latency_breakdown( + swarm: &dyn Swarm, + start_time: u64, + end_time: u64, +) -> anyhow::Result { + // Averaging over 1m, and skipping data points at the start that would take averages outside of the interval. + let start_time_adjusted = start_time + 60; + let consensus_proposal_to_ordered_query = r#"quantile(0.67, rate(aptos_consensus_block_tracing_sum{role=~"validator", stage="ordered"}[1m]) / rate(aptos_consensus_block_tracing_count{role=~"validator", stage="ordered"}[1m]))"#; + let consensus_proposal_to_commit_query = r#"quantile(0.67, rate(aptos_consensus_block_tracing_sum{role=~"validator", stage="committed"}[1m]) / rate(aptos_consensus_block_tracing_count{role=~"validator", stage="committed"}[1m]))"#; + + let qs_batch_to_pos_query = r#"sum(rate(quorum_store_batch_to_PoS_duration_sum{role=~"validator"}[1m])) / sum(rate(quorum_store_batch_to_PoS_duration_count{role=~"validator"}[1m]))"#; + let qs_pos_to_proposal_query = r#"sum(rate(quorum_store_pos_to_pull_sum{role=~"validator"}[1m])) / sum(rate(quorum_store_pos_to_pull_count{role=~"validator"}[1m]))"#; + + let consensus_proposal_to_ordered_samples = swarm + .query_range_metrics( + consensus_proposal_to_ordered_query, + start_time_adjusted as i64, + end_time as i64, + None, + ) + .await?; + + let consensus_proposal_to_commit_samples = swarm + .query_range_metrics( + consensus_proposal_to_commit_query, + start_time_adjusted as i64, + end_time as i64, + None, + ) + .await?; + + let consensus_ordered_to_commit_samples = swarm + .query_range_metrics( + &format!( + "{} - {}", + consensus_proposal_to_commit_query, consensus_proposal_to_ordered_query + ), + start_time_adjusted as i64, + end_time as i64, + None, + ) + .await?; + + let qs_batch_to_pos_samples = swarm + .query_range_metrics( + qs_batch_to_pos_query, + start_time_adjusted as i64, + end_time as i64, + None, + ) + .await?; + + let qs_pos_to_proposal_samples = swarm + .query_range_metrics( + qs_pos_to_proposal_query, + start_time_adjusted as i64, + end_time as i64, + None, + ) + .await?; + + let mut samples = BTreeMap::new(); + samples.insert( + LatencyBreakdownSlice::QsBatchToPos, + MetricSamples::new(qs_batch_to_pos_samples), + ); + samples.insert( + LatencyBreakdownSlice::QsPosToProposal, + MetricSamples::new(qs_pos_to_proposal_samples), + ); + samples.insert( + LatencyBreakdownSlice::ConsensusProposalToOrdered, + MetricSamples::new(consensus_proposal_to_ordered_samples), + ); + samples.insert( + LatencyBreakdownSlice::ConsensusOrderedToCommit, + MetricSamples::new(consensus_ordered_to_commit_samples), + ); + samples.insert( + LatencyBreakdownSlice::ConsensusProposalToCommit, + MetricSamples::new(consensus_proposal_to_commit_samples), + ); + + Ok(LatencyBreakdown::new(samples)) +} diff --git a/testsuite/forge/src/interface/swarm.rs b/testsuite/forge/src/interface/swarm.rs index 08b24fbbc9518..922d095957a07 100644 --- a/testsuite/forge/src/interface/swarm.rs +++ b/testsuite/forge/src/interface/swarm.rs @@ -3,8 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - interface::system_metrics::SystemMetricsThreshold, AptosPublicInfo, ChainInfo, FullNode, - NodeExt, Result, SwarmChaos, Validator, Version, + AptosPublicInfo, ChainInfo, FullNode, NodeExt, Result, SwarmChaos, Validator, Version, }; use anyhow::{anyhow, bail}; use aptos_config::config::NodeConfig; @@ -85,13 +84,6 @@ pub trait Swarm: Sync { async fn ensure_no_validator_restart(&self) -> Result<()>; async fn ensure_no_fullnode_restart(&self) -> Result<()>; - async fn ensure_healthy_system_metrics( - &mut self, - start_time: i64, - end_time: i64, - threshold: SystemMetricsThreshold, - ) -> Result<()>; - // Get prometheus metrics from the swarm async fn query_metrics( &self, diff --git a/testsuite/forge/src/interface/system_metrics.rs b/testsuite/forge/src/interface/system_metrics.rs deleted file mode 100644 index 2d01fcdcdaa04..0000000000000 --- a/testsuite/forge/src/interface/system_metrics.rs +++ /dev/null @@ -1,202 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::prometheus::construct_query_with_extra_labels; -use again::RetryPolicy; -use anyhow::{anyhow, bail}; -use once_cell::sync::Lazy; -use prometheus_http_query::{response::Sample, Client as PrometheusClient}; -use serde::Serialize; -use std::{collections::BTreeMap, time::Duration}; - -#[derive(Default, Clone, Debug)] -pub struct SystemMetrics { - cpu_core_metrics: Vec, - memory_bytes_metrics: Vec, -} - -static RETRY_POLICY: Lazy = Lazy::new(|| { - RetryPolicy::exponential(Duration::from_millis(125)) - .with_max_retries(3) - .with_jitter(true) -}); - -impl SystemMetrics { - pub fn new(cpu_metrics: Vec, memory_metrics: Vec) -> Self { - Self { - cpu_core_metrics: cpu_metrics, - memory_bytes_metrics: memory_metrics, - } - } -} - -#[derive(Default, Clone, Debug, Serialize)] -pub struct MetricsThreshold { - max: f64, - // % of the data point that can breach the max threshold - max_breach_pct: usize, -} - -impl MetricsThreshold { - pub fn new(max: f64, max_breach_pct: usize) -> Self { - Self { - max, - max_breach_pct, - } - } - - pub fn new_gb(max: f64, max_breach_pct: usize) -> Self { - Self { - max: max * 1024.0 * 1024.0 * 1024.0, - max_breach_pct, - } - } -} - -#[derive(Default, Clone, Debug, Serialize)] -pub struct SystemMetricsThreshold { - cpu_threshold: MetricsThreshold, - memory_threshold: MetricsThreshold, -} - -impl SystemMetricsThreshold { - pub fn ensure_threshold(&self, metrics: &SystemMetrics) -> anyhow::Result<()> { - ensure_metrics_threshold("cpu", &self.cpu_threshold, &metrics.cpu_core_metrics)?; - ensure_metrics_threshold( - "memory", - &self.memory_threshold, - &metrics.memory_bytes_metrics, - )?; - Ok(()) - } - - pub fn new(cpu_threshold: MetricsThreshold, memory_threshold: MetricsThreshold) -> Self { - Self { - cpu_threshold, - memory_threshold, - } - } -} - -pub fn ensure_metrics_threshold( - metrics_name: &str, - threshold: &MetricsThreshold, - metrics: &Vec, -) -> anyhow::Result<()> { - if metrics.is_empty() { - bail!("Empty metrics provided"); - } - let breach_count = metrics - .iter() - .filter(|sample| sample.value() > threshold.max) - .count(); - let breach_pct = (breach_count * 100) / metrics.len(); - if breach_pct > threshold.max_breach_pct { - bail!( - "{:?} metric violated threshold of {:?}, max_breach_pct: {:?}, breach_pct: {:?} ", - metrics_name, - threshold.max, - threshold.max_breach_pct, - breach_pct - ); - } - Ok(()) -} - -async fn query_prometheus_range_metrics( - query: &str, - client: &PrometheusClient, - start_time: i64, - end_time: i64, - internal_secs: f64, - namespace: &str, -) -> anyhow::Result> { - RETRY_POLICY - .retry(move || { - get_prometheus_range_metrics( - query, - client, - start_time, - end_time, - internal_secs, - namespace, - ) - }) - .await - .map_err(|e| anyhow!("Failed to query prometheus for system metrics: {}", e)) -} - -async fn get_prometheus_range_metrics( - query: &str, - client: &PrometheusClient, - start_time: i64, - end_time: i64, - internal_secs: f64, - namespace: &str, -) -> anyhow::Result> { - let mut labels_map = BTreeMap::new(); - labels_map.insert("namespace".to_string(), namespace.to_string()); - let response = client - .query_range( - construct_query_with_extra_labels(query, &labels_map), - start_time, - end_time, - internal_secs, - None, - ) - .await?; - Ok(response - .as_range() - .ok_or_else(|| anyhow!("Failed to get range from prometheus response"))? - .first() - .ok_or_else(|| anyhow!("Empty range vector returned from prometheus"))? - .samples() - .to_vec()) -} - -pub async fn query_prometheus_system_metrics( - client: &PrometheusClient, - start_time: i64, - end_time: i64, - internal_secs: f64, - namespace: &str, -) -> anyhow::Result { - let cpu_query = r#"avg(rate(container_cpu_usage_seconds_total{container=~"validator"}[30s]))"#; - let memory_query = r#"avg(container_memory_rss{container=~"validator"})"#; - - let cpu_samples = query_prometheus_range_metrics( - cpu_query, - client, - start_time, - end_time, - internal_secs, - namespace, - ) - .await?; - - let memory_samples = query_prometheus_range_metrics( - memory_query, - client, - start_time, - end_time, - internal_secs, - namespace, - ) - .await?; - - Ok(SystemMetrics::new(cpu_samples, memory_samples)) -} - -#[cfg(test)] -mod tests { - - use super::*; - #[tokio::test] - async fn test_empty_metrics_threshold() { - let cpu_threshold = MetricsThreshold::new(10.0, 30); - let memory_threshold = MetricsThreshold::new(100.0, 40); - let threshold = SystemMetricsThreshold::new(cpu_threshold, memory_threshold); - let metrics = SystemMetrics::new(vec![], vec![]); - threshold.ensure_threshold(&metrics).unwrap_err(); - } -} diff --git a/testsuite/forge/src/runner.rs b/testsuite/forge/src/runner.rs index 52a9e19c30b35..f1693a63f2214 100644 --- a/testsuite/forge/src/runner.rs +++ b/testsuite/forge/src/runner.rs @@ -2,11 +2,10 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::*; // TODO going to remove random seed once cluster deployment supports re-run genesis use crate::{ - success_criteria::SuccessCriteria, - system_metrics::{MetricsThreshold, SystemMetricsThreshold}, + success_criteria::{MetricsThreshold, SuccessCriteria, SystemMetricsThreshold}, + *, }; use anyhow::{bail, format_err, Error, Result}; use aptos_framework::ReleaseBundle; diff --git a/testsuite/forge/src/success_criteria.rs b/testsuite/forge/src/success_criteria.rs index 167993d7ced86..a6ab3a782dd0a 100644 --- a/testsuite/forge/src/success_criteria.rs +++ b/testsuite/forge/src/success_criteria.rs @@ -2,7 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - system_metrics::{ensure_metrics_threshold, MetricsThreshold, SystemMetricsThreshold}, + prometheus_metrics::{ + fetch_system_metrics, LatencyBreakdown, LatencyBreakdownSlice, SystemMetrics, + }, Swarm, SwarmExt, TestReport, }; use anyhow::{bail, Context}; @@ -10,7 +12,7 @@ use aptos::node::analyze::fetch_metadata::FetchMetadata; use aptos_sdk::types::PeerId; use aptos_transaction_emitter_lib::{TxnStats, TxnStatsRate}; use prometheus_http_query::response::Sample; -use std::{collections::BTreeMap, fmt, time::Duration}; +use std::{collections::BTreeMap, time::Duration}; #[derive(Clone, Debug)] pub struct StateProgressThreshold { @@ -26,6 +28,118 @@ pub enum LatencyType { P99, } +#[derive(Default, Clone, Debug)] +pub struct MetricsThreshold { + max: f64, + // % of the data point that can breach the max threshold + max_breach_pct: usize, +} + +impl MetricsThreshold { + pub fn new(max: f64, max_breach_pct: usize) -> Self { + Self { + max, + max_breach_pct, + } + } + + pub fn new_gb(max: f64, max_breach_pct: usize) -> Self { + Self { + max: max * 1024.0 * 1024.0 * 1024.0, + max_breach_pct, + } + } + + pub fn ensure_metrics_threshold( + &self, + metrics_name: &str, + metrics: &Vec, + ) -> anyhow::Result<()> { + if metrics.is_empty() { + bail!("Empty metrics provided"); + } + let breach_count = metrics + .iter() + .filter(|sample| sample.value() > self.max) + .count(); + let breach_pct = (breach_count * 100) / metrics.len(); + if breach_pct > self.max_breach_pct { + bail!( + "{:?} metric violated threshold of {:?}, max_breach_pct: {:?}, breach_pct: {:?} ", + metrics_name, + self.max, + self.max_breach_pct, + breach_pct + ); + } + Ok(()) + } +} + +#[derive(Default, Clone, Debug)] +pub struct SystemMetricsThreshold { + cpu_threshold: MetricsThreshold, + memory_threshold: MetricsThreshold, +} + +impl SystemMetricsThreshold { + pub fn ensure_threshold(&self, metrics: &SystemMetrics) -> anyhow::Result<()> { + self.cpu_threshold + .ensure_metrics_threshold("cpu", metrics.cpu_core_metrics.get())?; + self.memory_threshold + .ensure_metrics_threshold("memory", metrics.memory_bytes_metrics.get())?; + Ok(()) + } + + pub fn new(cpu_threshold: MetricsThreshold, memory_threshold: MetricsThreshold) -> Self { + Self { + cpu_threshold, + memory_threshold, + } + } +} + +#[derive(Clone, Debug)] +pub struct LatencyBreakdownThreshold { + pub thresholds: BTreeMap, +} + +impl LatencyBreakdownThreshold { + pub fn new_strict( + qs_batch_to_pos_threshold: f64, + qs_pos_to_proposal_threshold: f64, + consensus_proposal_to_ordered_threshold: f64, + consensus_ordered_to_commit_threshold: f64, + ) -> Self { + let mut thresholds = BTreeMap::new(); + thresholds.insert( + LatencyBreakdownSlice::QsBatchToPos, + MetricsThreshold::new(qs_batch_to_pos_threshold, 0), + ); + thresholds.insert( + LatencyBreakdownSlice::QsPosToProposal, + MetricsThreshold::new(qs_pos_to_proposal_threshold, 0), + ); + thresholds.insert( + LatencyBreakdownSlice::ConsensusProposalToOrdered, + MetricsThreshold::new(consensus_proposal_to_ordered_threshold, 0), + ); + thresholds.insert( + LatencyBreakdownSlice::ConsensusOrderedToCommit, + MetricsThreshold::new(consensus_ordered_to_commit_threshold, 0), + ); + Self { thresholds } + } + + pub fn ensure_threshold(&self, metrics: &LatencyBreakdown) -> anyhow::Result<()> { + for (slice, threshold) in &self.thresholds { + let samples = metrics.get_samples(slice); + threshold.ensure_metrics_threshold(&format!("{:?}", slice), samples.get())?; + } + Ok(()) + } +} + #[derive(Default, Clone, Debug)] pub struct SuccessCriteria { pub min_avg_tps: usize, @@ -97,105 +211,6 @@ impl SuccessCriteria { } } -#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] -pub enum LatencyBreakdownSlice { - QsBatchToPos, - QsPosToProposal, - ConsensusProposalToOrdered, - ConsensusOrderedToCommit, -} - -#[derive(Clone)] -pub struct LatencySamples(Vec); - -impl LatencySamples { - pub fn new(samples: Vec) -> Self { - Self(samples) - } - - pub fn max_sample(&self) -> f64 { - self.0 - .iter() - .map(|s| s.value()) - .max_by(|a, b| a.partial_cmp(b).unwrap()) - .unwrap_or_default() - } - - pub fn get(&self) -> &Vec { - &self.0 - } -} - -impl fmt::Debug for LatencySamples { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "{:?}", - self.0 - .iter() - .map(|s| (s.value(), s.timestamp())) - .collect::>() - ) - } -} - -#[derive(Clone, Debug)] -pub struct LatencyBreakdown(BTreeMap); - -impl LatencyBreakdown { - pub fn new(latency: BTreeMap) -> Self { - Self(latency) - } - - pub fn get_samples(&self, slice: LatencyBreakdownSlice) -> &LatencySamples { - self.0.get(&slice).unwrap() - } -} - -#[derive(Clone, Debug)] -pub struct LatencyBreakdownThreshold { - pub thresholds: BTreeMap, -} - -impl LatencyBreakdownThreshold { - pub fn new_strict( - qs_batch_to_pos_threshold: f64, - qs_pos_to_proposal_threshold: f64, - consensus_proposal_to_ordered_threshold: f64, - consensus_ordered_to_commit_threshold: f64, - ) -> Self { - let mut thresholds = BTreeMap::new(); - thresholds.insert( - LatencyBreakdownSlice::QsBatchToPos, - MetricsThreshold::new(qs_batch_to_pos_threshold, 0), - ); - thresholds.insert( - LatencyBreakdownSlice::QsPosToProposal, - MetricsThreshold::new(qs_pos_to_proposal_threshold, 0), - ); - thresholds.insert( - LatencyBreakdownSlice::ConsensusProposalToOrdered, - MetricsThreshold::new(consensus_proposal_to_ordered_threshold, 0), - ); - thresholds.insert( - LatencyBreakdownSlice::ConsensusOrderedToCommit, - MetricsThreshold::new(consensus_ordered_to_commit_threshold, 0), - ); - Self { thresholds } - } - - pub fn ensure_threshold(&self, metrics: &LatencyBreakdown) -> anyhow::Result<()> { - for (slice, threshold) in &self.thresholds { - let samples = metrics - .0 - .get(slice) - .ok_or_else(|| anyhow::anyhow!("Missing latency breakdown for {:?}", slice))?; - ensure_metrics_threshold(&format!("{:?}", slice), threshold, samples.get())?; - } - Ok(()) - } -} - pub struct SuccessCriteriaChecker {} impl SuccessCriteriaChecker { @@ -282,12 +297,8 @@ impl SuccessCriteriaChecker { .context("Failed ensuring no fullnode restarted")?; } - // TODO(skedia) Add end-to-end latency from counters after we have support for querying prometheus - // latency (in addition to checking latency from txn-emitter) - if let Some(system_metrics_threshold) = success_criteria.system_metrics_threshold.clone() { - swarm - .ensure_healthy_system_metrics(start_time, end_time, system_metrics_threshold) + Self::check_system_metrics(swarm, start_time, end_time, system_metrics_threshold) .await?; } @@ -522,4 +533,28 @@ impl SuccessCriteriaChecker { Ok(()) } } + + async fn check_system_metrics( + swarm: &mut dyn Swarm, + start_time: i64, + end_time: i64, + threshold: SystemMetricsThreshold, + ) -> anyhow::Result<()> { + let system_metrics = fetch_system_metrics(swarm, start_time, end_time).await?; + threshold.ensure_threshold(&system_metrics) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + #[tokio::test] + async fn test_empty_metrics_threshold() { + let cpu_threshold = MetricsThreshold::new(10.0, 30); + let memory_threshold = MetricsThreshold::new(100.0, 40); + let threshold = SystemMetricsThreshold::new(cpu_threshold, memory_threshold); + let metrics = SystemMetrics::new(vec![], vec![]); + threshold.ensure_threshold(&metrics).unwrap_err(); + } } diff --git a/testsuite/testcases/src/lib.rs b/testsuite/testcases/src/lib.rs index e8b55d5816bca..fbdf6a9a58ef6 100644 --- a/testsuite/testcases/src/lib.rs +++ b/testsuite/testcases/src/lib.rs @@ -27,7 +27,7 @@ pub mod validator_reboot_stress_test; use anyhow::Context; use aptos_forge::{ - success_criteria::{LatencyBreakdown, LatencyBreakdownSlice, LatencySamples}, + prometheus_metrics::{fetch_latency_breakdown, LatencyBreakdown}, EmitJobRequest, NetworkContext, NetworkTest, NodeExt, Result, Swarm, SwarmExt, Test, TestReport, TxnEmitter, TxnStats, Version, }; @@ -36,10 +36,7 @@ use aptos_rest_client::Client as RestClient; use aptos_sdk::{transaction_builder::TransactionFactory, types::PeerId}; use futures::future::join_all; use rand::{rngs::StdRng, SeedableRng}; -use std::{ - collections::BTreeMap, - time::{Duration, Instant, SystemTime, UNIX_EPOCH}, -}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::runtime::Runtime; const WARMUP_DURATION_FRACTION: f32 = 0.07; @@ -426,79 +423,6 @@ impl NetworkState { } } -async fn fetch_latency_breakdown( - swarm: &dyn Swarm, - start_time: u64, - end_time: u64, -) -> anyhow::Result { - // Averaging over 1m, and skipping data points at the start that would take averages outside of the interval. - let start_time_adjusted = start_time + 60; - let consensus_proposal_to_ordered_query = r#"quantile(0.67, rate(aptos_consensus_block_tracing_sum{role=~"validator", stage="ordered"}[1m]) / rate(aptos_consensus_block_tracing_count{role=~"validator", stage="ordered"}[1m]))"#; - let consensus_proposal_to_commit_query = r#"quantile(0.67, rate(aptos_consensus_block_tracing_sum{role=~"validator", stage="committed"}[1m]) / rate(aptos_consensus_block_tracing_count{role=~"validator", stage="committed"}[1m]))"#; - - let qs_batch_to_pos_query = r#"sum(rate(quorum_store_batch_to_PoS_duration_sum{role=~"validator"}[1m])) / sum(rate(quorum_store_batch_to_PoS_duration_count{role=~"validator"}[1m]))"#; - let qs_pos_to_proposal_query = r#"sum(rate(quorum_store_pos_to_pull_sum{role=~"validator"}[1m])) / sum(rate(quorum_store_pos_to_pull_count{role=~"validator"}[1m]))"#; - - let consensus_proposal_to_ordered_samples = swarm - .query_range_metrics( - consensus_proposal_to_ordered_query, - start_time_adjusted as i64, - end_time as i64, - None, - ) - .await?; - - let consensus_ordered_to_commit_samples = swarm - .query_range_metrics( - &format!( - "{} - {}", - consensus_proposal_to_commit_query, consensus_proposal_to_ordered_query - ), - start_time_adjusted as i64, - end_time as i64, - None, - ) - .await?; - - let qs_batch_to_pos_samples = swarm - .query_range_metrics( - qs_batch_to_pos_query, - start_time_adjusted as i64, - end_time as i64, - None, - ) - .await?; - - let qs_pos_to_proposal_samples = swarm - .query_range_metrics( - qs_pos_to_proposal_query, - start_time_adjusted as i64, - end_time as i64, - None, - ) - .await?; - - let mut samples = BTreeMap::new(); - samples.insert( - LatencyBreakdownSlice::QsBatchToPos, - LatencySamples::new(qs_batch_to_pos_samples), - ); - samples.insert( - LatencyBreakdownSlice::QsPosToProposal, - LatencySamples::new(qs_pos_to_proposal_samples), - ); - samples.insert( - LatencyBreakdownSlice::ConsensusProposalToOrdered, - LatencySamples::new(consensus_proposal_to_ordered_samples), - ); - samples.insert( - LatencyBreakdownSlice::ConsensusOrderedToCommit, - LatencySamples::new(consensus_ordered_to_commit_samples), - ); - - Ok(LatencyBreakdown::new(samples)) -} - pub struct LoadTestPhaseStats { pub emitter_stats: TxnStats, pub actual_duration: Duration, diff --git a/testsuite/testcases/src/load_vs_perf_benchmark.rs b/testsuite/testcases/src/load_vs_perf_benchmark.rs index cec31ae781a5a..c86ed7486b73a 100644 --- a/testsuite/testcases/src/load_vs_perf_benchmark.rs +++ b/testsuite/testcases/src/load_vs_perf_benchmark.rs @@ -4,9 +4,8 @@ use crate::NetworkLoadTest; use aptos_forge::{ args::TransactionTypeArg, - success_criteria::{ - LatencyBreakdown, LatencyBreakdownSlice, SuccessCriteria, SuccessCriteriaChecker, - }, + prometheus_metrics::{LatencyBreakdown, LatencyBreakdownSlice}, + success_criteria::{SuccessCriteria, SuccessCriteriaChecker}, EmitJobMode, EmitJobRequest, NetworkContext, NetworkTest, Result, Test, TxnStats, }; use aptos_logger::info; @@ -237,10 +236,10 @@ fn to_table(results: &[SingleRunStats]) -> Vec { rate.p50_latency, rate.p90_latency, rate.p99_latency, - result.latency_breakdown.get_samples(LatencyBreakdownSlice::QsBatchToPos).max_sample(), - result.latency_breakdown.get_samples(LatencyBreakdownSlice::QsPosToProposal).max_sample(), - result.latency_breakdown.get_samples(LatencyBreakdownSlice::ConsensusProposalToOrdered).max_sample(), - result.latency_breakdown.get_samples(LatencyBreakdownSlice::ConsensusOrderedToCommit).max_sample(), + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsBatchToPos).max_sample(), + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsPosToProposal).max_sample(), + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusProposalToOrdered).max_sample(), + result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusOrderedToCommit).max_sample(), result.actual_duration.as_secs() )); }