Skip to content

Commit

Permalink
refactoring to cleanup system metrics too
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos committed Aug 3, 2023
1 parent 28cdb30 commit e936f92
Show file tree
Hide file tree
Showing 13 changed files with 357 additions and 446 deletions.
4 changes: 2 additions & 2 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use aptos_config::config::{ChainHealthBackoffValues, ConsensusConfig, PipelineBa
use aptos_forge::{
args::TransactionTypeArg,
success_criteria::{
LatencyBreakdownThreshold, LatencyType, StateProgressThreshold, SuccessCriteria,
LatencyBreakdownThreshold, LatencyType, MetricsThreshold, StateProgressThreshold,
SuccessCriteria, SystemMetricsThreshold,
},
system_metrics::{MetricsThreshold, SystemMetricsThreshold},
ForgeConfig, Options, *,
};
use aptos_logger::{info, Level};
Expand Down
16 changes: 14 additions & 2 deletions testsuite/forge/src/backend/k8s/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,27 @@ pub async fn query_range_with_metadata(
new_query
)
})?;
Ok(r.as_range()
let range = r.as_range()
.ok_or_else(|| {
anyhow!(
"Failed to get range from prometheus response. start={}, end={}, query={}",
start_time,
end_time,
new_query
)
})?
})?;
info!("For Query {} got range {:?}", new_query, range);
if range.len() != 1 {
bail!(
"Expected only one range vector from prometheus, recieved {} ({:?}). start={}, end={}, query={}",
range.len(),
range,
start_time,
end_time,
new_query
);
}
Ok(range
.first()
.ok_or_else(|| {
anyhow!(
Expand Down
24 changes: 0 additions & 24 deletions testsuite/forge/src/backend/k8s/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
use crate::{
check_for_container_restart, create_k8s_client, delete_all_chaos, get_default_pfn_node_config,
get_free_port, get_stateful_set_image, install_public_fullnode,
interface::system_metrics::{query_prometheus_system_metrics, SystemMetricsThreshold},
node::K8sNode,
prometheus::{self, query_range_with_metadata, query_with_metadata},
query_sequence_number, set_stateful_set_image_tag, uninstall_testnet_resources, ChainInfo,
Expand Down Expand Up @@ -434,29 +433,6 @@ impl Swarm for K8sSwarm {
bail!("No prom client");
}

async fn ensure_healthy_system_metrics(
&mut self,
start_time: i64,
end_time: i64,
threshold: SystemMetricsThreshold,
) -> Result<()> {
if let Some(c) = &self.prom_client {
let system_metrics = query_prometheus_system_metrics(
c,
start_time,
end_time,
30.0,
&self.kube_namespace,
)
.await?;
threshold.ensure_threshold(&system_metrics)?;
info!("System metrics are healthy");
Ok(())
} else {
bail!("No prom client");
}
}

fn chain_info_for_node(&mut self, idx: usize) -> ChainInfo<'_> {
let rest_api_url = self.get_rest_api_url(idx);
let inspection_service_url = self.get_inspection_service_url(idx);
Expand Down
13 changes: 2 additions & 11 deletions testsuite/forge/src/backend/local/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
interface::system_metrics::SystemMetricsThreshold, ChainInfo, FullNode, HealthCheckError,
LocalNode, LocalVersion, Node, Swarm, SwarmChaos, SwarmExt, Validator, Version,
ChainInfo, FullNode, HealthCheckError, LocalNode, LocalVersion, Node, Swarm, SwarmChaos,
SwarmExt, Validator, Version,
};
use anyhow::{anyhow, bail, Result};
use aptos::common::types::EncodingType;
Expand Down Expand Up @@ -638,15 +638,6 @@ impl Swarm for LocalSwarm {
todo!()
}

async fn ensure_healthy_system_metrics(
&mut self,
_start_time: i64,
_end_time: i64,
_threshold: SystemMetricsThreshold,
) -> Result<()> {
todo!()
}

fn chain_info_for_node(&mut self, idx: usize) -> ChainInfo<'_> {
let rest_api_url = self
.validators()
Expand Down
2 changes: 1 addition & 1 deletion testsuite/forge/src/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub use chaos::*;
mod node;
pub use node::*;
mod chain_info;
pub mod system_metrics;
pub mod prometheus_metrics;

use aptos_framework::ReleaseBundle;
pub use chain_info::*;
Expand Down
3 changes: 2 additions & 1 deletion testsuite/forge/src/interface/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

use super::Test;
use crate::{
success_criteria::{LatencyBreakdown, SuccessCriteria, SuccessCriteriaChecker},
prometheus_metrics::LatencyBreakdown,
success_criteria::{SuccessCriteria, SuccessCriteriaChecker},
CoreContext, Result, Swarm, TestReport,
};
use aptos_transaction_emitter_lib::{EmitJobRequest, TxnStats};
Expand Down
184 changes: 184 additions & 0 deletions testsuite/forge/src/interface/prometheus_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::Swarm;
use prometheus_http_query::response::Sample;
use std::{collections::BTreeMap, fmt};

#[derive(Clone)]
pub struct MetricSamples(Vec<Sample>);

impl MetricSamples {
pub fn new(samples: Vec<Sample>) -> Self {
Self(samples)
}

pub fn max_sample(&self) -> f64 {
self.0
.iter()
.map(|s| s.value())
.max_by(|a, b| a.partial_cmp(b).unwrap())
.unwrap_or_default()
}

pub fn get(&self) -> &Vec<Sample> {
&self.0
}
}

impl fmt::Debug for MetricSamples {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{:?}",
self.0
.iter()
.map(|s| (s.value(), s.timestamp()))
.collect::<Vec<_>>()
)
}
}

#[derive(Clone, Debug)]
pub struct SystemMetrics {
pub cpu_core_metrics: MetricSamples,
pub memory_bytes_metrics: MetricSamples,
}

impl SystemMetrics {
pub fn new(cpu_metrics: Vec<Sample>, memory_metrics: Vec<Sample>) -> Self {
Self {
cpu_core_metrics: MetricSamples::new(cpu_metrics),
memory_bytes_metrics: MetricSamples::new(memory_metrics),
}
}
}

pub async fn fetch_system_metrics(
swarm: &dyn Swarm,
start_time: i64,
end_time: i64,
) -> anyhow::Result<SystemMetrics> {
let cpu_query = r#"avg(rate(container_cpu_usage_seconds_total{container=~"validator"}[30s]))"#;
let memory_query = r#"avg(container_memory_rss{container=~"validator"})"#;

let cpu_samples = swarm
.query_range_metrics(cpu_query, start_time, end_time, None)
.await?;

let memory_samples = swarm
.query_range_metrics(memory_query, start_time, end_time, None)
.await?;

Ok(SystemMetrics::new(cpu_samples, memory_samples))
}

#[derive(Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
pub enum LatencyBreakdownSlice {
QsBatchToPos,
QsPosToProposal,
ConsensusProposalToOrdered,
ConsensusOrderedToCommit,
ConsensusProposalToCommit,
}

#[derive(Clone, Debug)]
pub struct LatencyBreakdown(BTreeMap<LatencyBreakdownSlice, MetricSamples>);

impl LatencyBreakdown {
pub fn new(latency: BTreeMap<LatencyBreakdownSlice, MetricSamples>) -> Self {
Self(latency)
}

pub fn get_samples(&self, slice: &LatencyBreakdownSlice) -> &MetricSamples {
self.0
.get(slice)
.unwrap_or_else(|| panic!("Missing latency breakdown for {:?}", slice))
}
}

pub async fn fetch_latency_breakdown(
swarm: &dyn Swarm,
start_time: u64,
end_time: u64,
) -> anyhow::Result<LatencyBreakdown> {
// Averaging over 1m, and skipping data points at the start that would take averages outside of the interval.
let start_time_adjusted = start_time + 60;
let consensus_proposal_to_ordered_query = r#"quantile(0.67, rate(aptos_consensus_block_tracing_sum{role=~"validator", stage="ordered"}[1m]) / rate(aptos_consensus_block_tracing_count{role=~"validator", stage="ordered"}[1m]))"#;
let consensus_proposal_to_commit_query = r#"quantile(0.67, rate(aptos_consensus_block_tracing_sum{role=~"validator", stage="committed"}[1m]) / rate(aptos_consensus_block_tracing_count{role=~"validator", stage="committed"}[1m]))"#;

let qs_batch_to_pos_query = r#"sum(rate(quorum_store_batch_to_PoS_duration_sum{role=~"validator"}[1m])) / sum(rate(quorum_store_batch_to_PoS_duration_count{role=~"validator"}[1m]))"#;
let qs_pos_to_proposal_query = r#"sum(rate(quorum_store_pos_to_pull_sum{role=~"validator"}[1m])) / sum(rate(quorum_store_pos_to_pull_count{role=~"validator"}[1m]))"#;

let consensus_proposal_to_ordered_samples = swarm
.query_range_metrics(
consensus_proposal_to_ordered_query,
start_time_adjusted as i64,
end_time as i64,
None,
)
.await?;

let consensus_proposal_to_commit_samples = swarm
.query_range_metrics(
consensus_proposal_to_commit_query,
start_time_adjusted as i64,
end_time as i64,
None,
)
.await?;

let consensus_ordered_to_commit_samples = swarm
.query_range_metrics(
&format!(
"{} - {}",
consensus_proposal_to_commit_query, consensus_proposal_to_ordered_query
),
start_time_adjusted as i64,
end_time as i64,
None,
)
.await?;

let qs_batch_to_pos_samples = swarm
.query_range_metrics(
qs_batch_to_pos_query,
start_time_adjusted as i64,
end_time as i64,
None,
)
.await?;

let qs_pos_to_proposal_samples = swarm
.query_range_metrics(
qs_pos_to_proposal_query,
start_time_adjusted as i64,
end_time as i64,
None,
)
.await?;

let mut samples = BTreeMap::new();
samples.insert(
LatencyBreakdownSlice::QsBatchToPos,
MetricSamples::new(qs_batch_to_pos_samples),
);
samples.insert(
LatencyBreakdownSlice::QsPosToProposal,
MetricSamples::new(qs_pos_to_proposal_samples),
);
samples.insert(
LatencyBreakdownSlice::ConsensusProposalToOrdered,
MetricSamples::new(consensus_proposal_to_ordered_samples),
);
samples.insert(
LatencyBreakdownSlice::ConsensusOrderedToCommit,
MetricSamples::new(consensus_ordered_to_commit_samples),
);
samples.insert(
LatencyBreakdownSlice::ConsensusProposalToCommit,
MetricSamples::new(consensus_proposal_to_commit_samples),
);

Ok(LatencyBreakdown::new(samples))
}
10 changes: 1 addition & 9 deletions testsuite/forge/src/interface/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
interface::system_metrics::SystemMetricsThreshold, AptosPublicInfo, ChainInfo, FullNode,
NodeExt, Result, SwarmChaos, Validator, Version,
AptosPublicInfo, ChainInfo, FullNode, NodeExt, Result, SwarmChaos, Validator, Version,
};
use anyhow::{anyhow, bail};
use aptos_config::config::NodeConfig;
Expand Down Expand Up @@ -85,13 +84,6 @@ pub trait Swarm: Sync {
async fn ensure_no_validator_restart(&self) -> Result<()>;
async fn ensure_no_fullnode_restart(&self) -> Result<()>;

async fn ensure_healthy_system_metrics(
&mut self,
start_time: i64,
end_time: i64,
threshold: SystemMetricsThreshold,
) -> Result<()>;

// Get prometheus metrics from the swarm
async fn query_metrics(
&self,
Expand Down
Loading

0 comments on commit e936f92

Please sign in to comment.