Skip to content

Commit

Permalink
[forge] LatencyBreakdown counters and use them in success criteria (#…
Browse files Browse the repository at this point in the history
…9393)

* refactor SystemMetrics to be cleaner, and build LatencyBreakdown metrics in the same flow
* have Swarm surface API for generic querying range from prometheus, and move validation and what needs to be fetched to SuccessCriteria
* move retries from system metrics alone, to prometheus calls themselves
* rename system_metrics.rs to prometheus_metrics.rs - and have fetching of system and latency metrics there
* move threshold logic to SuccessCriteria
* add fetching QS and consensus latency breakdown, and having a way to assert they pass.
* fixing construct_query_with_extra_labels, and updating land_blocking checks
  • Loading branch information
igor-aptos authored Aug 3, 2023
1 parent 8fe69b2 commit 1063755
Show file tree
Hide file tree
Showing 14 changed files with 717 additions and 418 deletions.
97 changes: 47 additions & 50 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use anyhow::{format_err, Context, Result};
use aptos_config::config::{ChainHealthBackoffValues, ConsensusConfig, PipelineBackpressureValues};
use aptos_forge::{
args::TransactionTypeArg,
success_criteria::{LatencyType, StateProgressThreshold, SuccessCriteria},
system_metrics::{MetricsThreshold, SystemMetricsThreshold},
prometheus_metrics::LatencyBreakdownSlice,
success_criteria::{
LatencyBreakdownThreshold, LatencyType, MetricsThreshold, StateProgressThreshold,
SuccessCriteria, SystemMetricsThreshold,
},
ForgeConfig, Options, *,
};
use aptos_logger::{info, Level};
Expand Down Expand Up @@ -41,7 +44,7 @@ use aptos_testcases::{
validator_reboot_stress_test::ValidatorRebootStressTest,
CompositeNetworkTest,
};
use clap::{Parser, Subcommand};
use clap::{Parser, Subcommand, __derive_refs::once_cell::sync::Lazy};
use futures::stream::{FuturesUnordered, StreamExt};
use rand::{rngs::ThreadRng, seq::SliceRandom, Rng};
use std::{
Expand Down Expand Up @@ -216,6 +219,24 @@ struct Resize {
enable_haproxy: bool,
}

// common metrics thresholds:
static SYSTEM_12_CORES_5GB_THRESHOLD: Lazy<SystemMetricsThreshold> = Lazy::new(|| {
SystemMetricsThreshold::new(
// Check that we don't use more than 12 CPU cores for 30% of the time.
MetricsThreshold::new(12.0, 30),
// Check that we don't use more than 5 GB of memory for 30% of the time.
MetricsThreshold::new_gb(5.0, 30),
)
});
static SYSTEM_12_CORES_10GB_THRESHOLD: Lazy<SystemMetricsThreshold> = Lazy::new(|| {
SystemMetricsThreshold::new(
// Check that we don't use more than 12 CPU cores for 30% of the time.
MetricsThreshold::new(12.0, 30),
// Check that we don't use more than 10 GB of memory for 30% of the time.
MetricsThreshold::new_gb(10.0, 30),
)
});

/// Make an easy to remember random namespace for your testnet
fn random_namespace<R: Rng>(dictionary: Vec<String>, rng: &mut R) -> Result<String> {
// Pick four random words
Expand Down Expand Up @@ -679,12 +700,7 @@ fn twin_validator_test() -> ForgeConfig {
SuccessCriteria::new(5500)
.add_no_restarts()
.add_wait_for_catchup_s(60)
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// Check that we don't use more than 12 CPU cores for 30% of the time.
MetricsThreshold::new(12, 30),
// Check that we don't use more than 5 GB of memory for 30% of the time.
MetricsThreshold::new(5 * 1024 * 1024 * 1024, 30),
))
.add_system_metrics_threshold(SYSTEM_12_CORES_5GB_THRESHOLD.clone())
.add_chain_progress(StateProgressThreshold {
max_no_progress_secs: 10.0,
max_round_gap: 4,
Expand Down Expand Up @@ -958,9 +974,9 @@ fn graceful_overload() -> ForgeConfig {
.add_wait_for_catchup_s(120)
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// Check that we don't use more than 12 CPU cores for 30% of the time.
MetricsThreshold::new(12, 40),
MetricsThreshold::new(12.0, 40),
// Check that we don't use more than 5 GB of memory for 30% of the time.
MetricsThreshold::new(5 * 1024 * 1024 * 1024, 30),
MetricsThreshold::new_gb(5.0, 30),
))
.add_latency_threshold(10.0, LatencyType::P50)
.add_latency_threshold(30.0, LatencyType::P90)
Expand Down Expand Up @@ -1009,9 +1025,9 @@ fn realistic_env_graceful_overload() -> ForgeConfig {
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// overload test uses more CPUs than others, so increase the limit
// Check that we don't use more than 18 CPU cores for 30% of the time.
MetricsThreshold::new(18, 40),
MetricsThreshold::new(18.0, 40),
// Check that we don't use more than 5 GB of memory for 30% of the time.
MetricsThreshold::new(5 * 1024 * 1024 * 1024, 30),
MetricsThreshold::new_gb(5.0, 30),
))
.add_latency_threshold(10.0, LatencyType::P50)
.add_latency_threshold(30.0, LatencyType::P90)
Expand Down Expand Up @@ -1417,12 +1433,7 @@ fn validators_join_and_leave() -> ForgeConfig {
SuccessCriteria::new(5000)
.add_no_restarts()
.add_wait_for_catchup_s(240)
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// Check that we don't use more than 12 CPU cores for 30% of the time.
MetricsThreshold::new(12, 30),
// Check that we don't use more than 10 GB of memory for 30% of the time.
MetricsThreshold::new(10 * 1024 * 1024 * 1024, 30),
))
.add_system_metrics_threshold(SYSTEM_12_CORES_10GB_THRESHOLD.clone())
.add_chain_progress(StateProgressThreshold {
max_no_progress_secs: 10.0,
max_round_gap: 4,
Expand Down Expand Up @@ -1452,12 +1463,7 @@ fn land_blocking_test_suite(duration: Duration) -> ForgeConfig {
// Give at least 60s for catchup, give 10% of the run for longer durations.
(duration.as_secs() / 10).max(60),
)
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// Check that we don't use more than 12 CPU cores for 30% of the time.
MetricsThreshold::new(12, 30),
// Check that we don't use more than 10 GB of memory for 30% of the time.
MetricsThreshold::new(10 * 1024 * 1024 * 1024, 30),
))
.add_system_metrics_threshold(SYSTEM_12_CORES_10GB_THRESHOLD.clone())
.add_chain_progress(StateProgressThreshold {
max_no_progress_secs: 10.0,
max_round_gap: 4,
Expand Down Expand Up @@ -1518,13 +1524,19 @@ fn realistic_env_max_load_test(
(duration.as_secs() / 10).max(60),
)
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// Check that we don't use more than 12 CPU cores for 30% of the time.
MetricsThreshold::new(14, max_cpu_threshold),
// Check that we don't use more than 14 CPU cores for 30% of the time.
MetricsThreshold::new(14.0, max_cpu_threshold),
// Check that we don't use more than 10 GB of memory for 30% of the time.
MetricsThreshold::new(10 * 1024 * 1024 * 1024, 30),
MetricsThreshold::new_gb(10.0, 30),
))
.add_latency_threshold(3.0, LatencyType::P50)
.add_latency_threshold(5.0, LatencyType::P90)
.add_latency_threshold(3.4, LatencyType::P50)
.add_latency_threshold(4.5, LatencyType::P90)
.add_latency_breakdown_threshold(LatencyBreakdownThreshold::new_strict(vec![
(LatencyBreakdownSlice::QsBatchToPos, 0.3),
(LatencyBreakdownSlice::QsPosToProposal, 0.25),
(LatencyBreakdownSlice::ConsensusProposalToOrdered, 0.8),
(LatencyBreakdownSlice::ConsensusOrderedToCommit, 0.6),
]))
.add_chain_progress(StateProgressThreshold {
max_no_progress_secs: 10.0,
max_round_gap: 4,
Expand Down Expand Up @@ -1580,9 +1592,9 @@ fn realistic_network_tuned_for_throughput_test() -> ForgeConfig {
// Tuned for throughput uses more cores than regular tests,
// as it achieves higher throughput.
// Check that we don't use more than 14 CPU cores for 30% of the time.
MetricsThreshold::new(14, 30),
MetricsThreshold::new(14.0, 30),
// Check that we don't use more than 10 GB of memory for 30% of the time.
MetricsThreshold::new(10 * 1024 * 1024 * 1024, 30),
MetricsThreshold::new_gb(10.0, 30),
))
.add_chain_progress(StateProgressThreshold {
max_no_progress_secs: 10.0,
Expand Down Expand Up @@ -1612,12 +1624,7 @@ fn chaos_test_suite(duration: Duration) -> ForgeConfig {
},
)
.add_no_restarts()
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// Check that we don't use more than 12 CPU cores for 30% of the time.
MetricsThreshold::new(12, 30),
// Check that we don't use more than 5 GB of memory for 30% of the time.
MetricsThreshold::new(5 * 1024 * 1024 * 1024, 30),
)),
.add_system_metrics_threshold(SYSTEM_12_CORES_5GB_THRESHOLD.clone()),
)
}

Expand Down Expand Up @@ -1791,12 +1798,7 @@ fn quorum_store_reconfig_enable_test() -> ForgeConfig {
SuccessCriteria::new(5000)
.add_no_restarts()
.add_wait_for_catchup_s(240)
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// Check that we don't use more than 12 CPU cores for 30% of the time.
MetricsThreshold::new(12, 30),
// Check that we don't use more than 10 GB of memory for 30% of the time.
MetricsThreshold::new(10 * 1024 * 1024 * 1024, 30),
))
.add_system_metrics_threshold(SYSTEM_12_CORES_10GB_THRESHOLD.clone())
.add_chain_progress(StateProgressThreshold {
max_no_progress_secs: 10.0,
max_round_gap: 4,
Expand Down Expand Up @@ -1863,12 +1865,7 @@ fn multiregion_benchmark_test() -> ForgeConfig {
// Give at least 60s for catchup, give 10% of the run for longer durations.
180,
)
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// Check that we don't use more than 12 CPU cores for 30% of the time.
MetricsThreshold::new(12, 30),
// Check that we don't use more than 10 GB of memory for 30% of the time.
MetricsThreshold::new(10 * 1024 * 1024 * 1024, 30),
))
.add_system_metrics_threshold(SYSTEM_12_CORES_10GB_THRESHOLD.clone())
.add_chain_progress(StateProgressThreshold {
max_no_progress_secs: 10.0,
max_round_gap: 4,
Expand Down
140 changes: 110 additions & 30 deletions testsuite/forge/src/backend/k8s/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,23 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{create_k8s_client, K8sApi, ReadWrite, Result};
use anyhow::bail;
use again::RetryPolicy;
use anyhow::{anyhow, bail};
use aptos_logger::info;
use k8s_openapi::api::core::v1::Secret;
use prometheus_http_query::{response::PromqlResult, Client as PrometheusClient};
use once_cell::sync::Lazy;
use prometheus_http_query::{
response::{PromqlResult, Sample},
Client as PrometheusClient,
};
use reqwest::{header, Client as HttpClient};
use std::{collections::BTreeMap, sync::Arc};
use std::{collections::BTreeMap, sync::Arc, time::Duration};

static PROMETHEUS_RETRY_POLICY: Lazy<RetryPolicy> = Lazy::new(|| {
RetryPolicy::exponential(Duration::from_millis(125))
.with_max_retries(3)
.with_jitter(true)
});

pub async fn get_prometheus_client() -> Result<PrometheusClient> {
// read from the environment
Expand Down Expand Up @@ -91,45 +102,104 @@ async fn create_prometheus_client_from_environment(

pub fn construct_query_with_extra_labels(
query: &str,
labels_map: BTreeMap<String, String>,
labels_map: &BTreeMap<String, String>,
) -> String {
// edit the query string to insert swarm metadata
let mut new_query = query.to_string();
let mut label_start_idx = query.find('{').unwrap_or(query.len());
if label_start_idx == query.len() {
// add a new curly and insert after it
new_query.insert_str(query.len(), "{}");
label_start_idx += 1;
} else {
// add a comma prefix to the existing labels and insert before it
label_start_idx += 1;
new_query.insert(label_start_idx, ',');
}
let mut new_query = "".to_string();

let mut labels_strs = vec![];
for (k, v) in labels_map {
labels_strs.push(format!(r#"{}="{}""#, k, v));
}

let labels = labels_strs.join(",");

// assume no collisions in Forge namespace
new_query.insert_str(label_start_idx, &labels);
new_query
let parts: Vec<&str> = query.split_inclusive('{').collect();
if parts.len() == 1 {
// no labels in query
format!("{}{{{}}}", query, labels)
} else {
let mut parts_iter = parts.into_iter();
let prev = parts_iter.next();
new_query.push_str(prev.unwrap());

for part in parts_iter {
if part.starts_with('}') {
// assume no collisions in Forge namespace
new_query.push_str(&labels);
} else {
// assume no collisions in Forge namespace
new_query.push_str(&labels);
new_query.push(',');
}
new_query.push_str(part);
}
new_query
}
}

pub async fn query_with_metadata(
prom_client: &PrometheusClient,
query: &str,
time: Option<i64>,
timeout: Option<i64>,
labels_map: BTreeMap<String, String>,
labels_map: &BTreeMap<String, String>,
) -> Result<PromqlResult> {
let new_query = construct_query_with_extra_labels(query, labels_map);
match prom_client.query(&new_query, time, timeout).await {
Ok(r) => Ok(r),
Err(e) => bail!(e),
let new_query_ref = &new_query;
PROMETHEUS_RETRY_POLICY
.retry(move || prom_client.query(new_query_ref, time, timeout))
.await
.map_err(|e| anyhow!("Failed to query prometheus for {}: {}", query, e))
}

pub async fn query_range_with_metadata(
prom_client: &PrometheusClient,
query: &str,
start_time: i64,
end_time: i64,
internal_secs: f64,
timeout: Option<i64>,
labels_map: &BTreeMap<String, String>,
) -> Result<Vec<Sample>> {
let new_query = construct_query_with_extra_labels(query, labels_map);
let new_query_ref = &new_query;
let r = PROMETHEUS_RETRY_POLICY
.retry(move || {
prom_client.query_range(new_query_ref, start_time, end_time, internal_secs, timeout)
})
.await
.map_err(|e| {
anyhow!(
"Failed to query prometheus {}. start={}, end={}, query={}",
e,
start_time,
end_time,
new_query
)
})?;
let range = r.as_range().ok_or_else(|| {
anyhow!(
"Failed to get range from prometheus response. start={}, end={}, query={}",
start_time,
end_time,
new_query
)
})?;
if range.len() != 1 {
bail!(
"Expected only one range vector from prometheus, recieved {} ({:?}). start={}, end={}, query={}",
range.len(),
range,
start_time,
end_time,
new_query
);
}
Ok(range
.first()
.unwrap() // safe because we checked length above
.samples()
.to_vec())
}

#[cfg(test)]
Expand Down Expand Up @@ -253,22 +323,32 @@ mod tests {

#[test]
fn test_create_query() {
// test when no existing labels
let original_query = "aptos_connections";
let mut labels_map = BTreeMap::new();
labels_map.insert("a".to_string(), "a".to_string());
labels_map.insert("some_label".to_string(), "blabla".to_string());

// test when no existing labels
let original_query = "aptos_connections";
let expected_query = r#"aptos_connections{a="a",some_label="blabla"}"#;
let new_query = construct_query_with_extra_labels(original_query, &labels_map);
assert_eq!(expected_query, new_query);

// test when empty labels
let original_query = "aptos_connections{}";
let expected_query = r#"aptos_connections{a="a",some_label="blabla"}"#;
let new_query = construct_query_with_extra_labels(original_query, labels_map);
let new_query = construct_query_with_extra_labels(original_query, &labels_map);
assert_eq!(expected_query, new_query);

// test when existing labels
let original_query = r#"aptos_connections{abc="123",def="456"}"#;
let mut labels_map = BTreeMap::new();
labels_map.insert("a".to_string(), "a".to_string());
labels_map.insert("some_label".to_string(), "blabla".to_string());
let expected_query = r#"aptos_connections{a="a",some_label="blabla",abc="123",def="456"}"#;
let new_query = construct_query_with_extra_labels(original_query, labels_map);
let new_query = construct_query_with_extra_labels(original_query, &labels_map);
assert_eq!(expected_query, new_query);

// test when multiple queries
let original_query = r#"aptos_connections{abc="123",def="456"} - aptos_disconnects{abc="123"} / aptos_count{}"#;
let expected_query = r#"aptos_connections{a="a",some_label="blabla",abc="123",def="456"} - aptos_disconnects{a="a",some_label="blabla",abc="123"} / aptos_count{a="a",some_label="blabla"}"#;
let new_query = construct_query_with_extra_labels(original_query, &labels_map);
assert_eq!(expected_query, new_query);
}
}
Loading

0 comments on commit 1063755

Please sign in to comment.