Skip to content

Commit

Permalink
[forge] Add latency for different workloads
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos committed Aug 2, 2023
1 parent 21b4764 commit 8b5de60
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 69 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/forge-stable.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,18 @@ jobs:
FORGE_TEST_SUITE: realistic_env_load_sweep
POST_TO_SLACK: true

run-forge-realistic-env-workload-sweep:
if: ${{ github.event_name != 'pull_request' && always() }}
needs: [determine-test-metadata, run-forge-realistic-env-max-workload-long] # Only run after the previous job completes
uses: aptos-labs/aptos-core/.github/workflows/workflow-run-forge.yaml@main
secrets: inherit
with:
IMAGE_TAG: ${{ needs.determine-test-metadata.outputs.IMAGE_TAG }}
FORGE_NAMESPACE: forge-realistic-env-workload-sweep-${{ needs.determine-test-metadata.outputs.IMAGE_TAG }}
FORGE_RUNNER_DURATION_SECS: 1500 # Run for 25 minutes (5 tests, each for 300 seconds)
FORGE_TEST_SUITE: realistic_env_workload_sweep
POST_TO_SLACK: true

run-forge-realistic-env-graceful-overload:
if: ${{ github.event_name != 'pull_request' && always() }}
needs: [determine-test-metadata, run-forge-realistic-env-load-sweep] # Only run after the previous job completes
Expand Down
123 changes: 98 additions & 25 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,15 @@ fn main() -> Result<()> {
logger.build();

let args = Args::parse();
let duration = Duration::from_secs(args.duration_secs as u64);
let duration = Duration::from_secs(2100); // args.duration_secs as u64);
let suite_name: &str = args.suite.as_ref();

let suite_name = if suite_name == "realistic_env_max_load" {
"realistic_env_workload_sweep"
} else {
panic!("")
};

let runtime = Runtime::new()?;
match args.cli_cmd {
// cmd input for test
Expand Down Expand Up @@ -526,6 +532,7 @@ fn single_test_suite(
// Rest of the tests:
"realistic_env_max_load_large" => realistic_env_max_load_test(duration, test_cmd, 20, 10),
"realistic_env_load_sweep" => realistic_env_load_sweep_test(),
"realistic_env_workload_sweep" => realistic_env_workload_sweep_test(),
"realistic_env_graceful_overload" => realistic_env_graceful_overload(),
"realistic_network_tuned_for_throughput" => realistic_network_tuned_for_throughput_test(),
"epoch_changer_performance" => epoch_changer_performance(),
Expand Down Expand Up @@ -796,34 +803,26 @@ fn consensus_stress_test() -> ForgeConfig {
})
}

fn realistic_env_load_sweep_test() -> ForgeConfig {
fn realistic_env_sweep_wrap(
num_validators: usize,
num_fullnodes: usize,
test: LoadVsPerfBenchmark,
) -> ForgeConfig {
ForgeConfig::default()
.with_initial_validator_count(NonZeroUsize::new(20).unwrap())
.with_initial_fullnode_count(10)
.add_network_test(wrap_with_realistic_env(LoadVsPerfBenchmark {
test: Box::new(PerformanceBenchmark),
workloads: Workloads::TPS(&[10, 100, 1000, 3000, 5000]),
criteria: [
(9, 1.5, 3., 4.),
(95, 1.5, 3., 4.),
(950, 2., 3., 4.),
(2750, 2.5, 3.5, 4.5),
(4600, 3., 4., 5.),
]
.into_iter()
.map(|(min_tps, max_lat_p50, max_lat_p90, max_lat_p99)| {
SuccessCriteria::new(min_tps)
.add_max_expired_tps(0)
.add_max_failed_submission_tps(0)
.add_latency_threshold(max_lat_p50, LatencyType::P50)
.add_latency_threshold(max_lat_p90, LatencyType::P90)
.add_latency_threshold(max_lat_p99, LatencyType::P99)
})
.collect(),
.with_initial_validator_count(NonZeroUsize::new(num_validators).unwrap())
.with_initial_fullnode_count(num_fullnodes)
.with_node_helm_config_fn(Arc::new(move |helm_values| {
helm_values["validator"]["config"]["execution"]
["processed_transactions_detailed_counters"] = true.into();
}))
.add_network_test(wrap_with_realistic_env(test))
// Test inherits the main EmitJobRequest, so update here for more precise latency measurements
.with_emit_job(
EmitJobRequest::default().latency_polling_interval(Duration::from_millis(100)),
EmitJobRequest::default()
.mode(EmitJobMode::MaxLoad {
mempool_backlog: 10000,
})
.latency_polling_interval(Duration::from_millis(100)),
)
.with_genesis_helm_config_fn(Arc::new(|helm_values| {
// no epoch change.
Expand All @@ -840,6 +839,80 @@ fn realistic_env_load_sweep_test() -> ForgeConfig {
)
}

fn realistic_env_load_sweep_test() -> ForgeConfig {
realistic_env_sweep_wrap(20, 10, LoadVsPerfBenchmark {
test: Box::new(PerformanceBenchmark),
workloads: Workloads::TPS(&[10, 100, 1000, 3000, 5000]),
criteria: [
(9, 1.5, 3., 4.),
(95, 1.5, 3., 4.),
(950, 2., 3., 4.),
(2750, 2.5, 3.5, 4.5),
(4600, 3., 4., 5.),
]
.into_iter()
.map(|(min_tps, max_lat_p50, max_lat_p90, max_lat_p99)| {
SuccessCriteria::new(min_tps)
.add_max_expired_tps(0)
.add_max_failed_submission_tps(0)
.add_latency_threshold(max_lat_p50, LatencyType::P50)
.add_latency_threshold(max_lat_p90, LatencyType::P90)
.add_latency_threshold(max_lat_p99, LatencyType::P99)
})
.collect(),
})
}

fn realistic_env_workload_sweep_test() -> ForgeConfig {
realistic_env_sweep_wrap(7, 3, LoadVsPerfBenchmark {
test: Box::new(PerformanceBenchmark),
workloads: Workloads::TRANSACTIONS(&[
TransactionWorkload {
transaction_type: TransactionTypeArg::CoinTransfer,
num_modules: 1,
unique_senders: false,
},
TransactionWorkload {
transaction_type: TransactionTypeArg::NoOp,
num_modules: 100,
unique_senders: false,
},
TransactionWorkload {
transaction_type: TransactionTypeArg::ModifyGlobalResource,
num_modules: 1,
unique_senders: true,
},
TransactionWorkload {
transaction_type: TransactionTypeArg::TokenV2AmbassadorMint,
num_modules: 1,
unique_senders: true,
},
TransactionWorkload {
transaction_type: TransactionTypeArg::PublishPackage,
num_modules: 1,
unique_senders: true,
},
]),
criteria: [
(1, 1.5, 3., 4.),
(1, 1.5, 3., 4.),
(1, 2., 3., 4.),
(1, 2.5, 3.5, 4.5),
(1, 3., 4., 5.),
]
.into_iter()
.map(|(min_tps, max_lat_p50, max_lat_p90, max_lat_p99)| {
SuccessCriteria::new(min_tps)
.add_max_expired_tps(0)
.add_max_failed_submission_tps(0)
.add_latency_threshold(max_lat_p50, LatencyType::P50)
.add_latency_threshold(max_lat_p90, LatencyType::P90)
.add_latency_threshold(max_lat_p99, LatencyType::P99)
})
.collect(),
})
}

fn load_vs_perf_benchmark() -> ForgeConfig {
ForgeConfig::default()
.with_initial_validator_count(NonZeroUsize::new(20).unwrap())
Expand Down
120 changes: 76 additions & 44 deletions testsuite/testcases/src/load_vs_perf_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ use aptos_forge::{
};
use aptos_logger::info;
use rand::SeedableRng;
use std::{
fmt::{self, Debug, Display},
time::Duration,
};
use std::{fmt::Debug, time::Duration};
use tokio::runtime::Runtime;

pub struct SingleRunStats {
Expand All @@ -39,8 +36,20 @@ impl Workloads {

fn name(&self, index: usize) -> String {
match self {
Self::TPS(tpss) => tpss[index].to_string(),
Self::TRANSACTIONS(workloads) => workloads[index].to_string(),
Self::TPS(tpss) => format!("TPS({})", tpss[index]),
Self::TRANSACTIONS(workloads) => format!("Workload({})", workloads[index].name()),
}
}

fn phase_name(&self, index: usize, phase: usize) -> String {
match self {
Self::TPS(_tpss) => unreachable!("TPS workload does not have phases"),
Self::TRANSACTIONS(workloads) => format!(
"Workload({}, phase={}, {})",
index,
phase,
workloads[index].phase_name(phase)
),
}
}

Expand All @@ -65,8 +74,6 @@ impl TransactionWorkload {
TransactionTypeArg::AccountGenerationLargePool.materialize(1, false);

if self.unique_senders {
request.transaction_type(self.transaction_type.materialize(self.num_modules, false))
} else {
let write_type = self.transaction_type.materialize(self.num_modules, true);
request.transaction_mix_per_phase(vec![
// warmup
Expand All @@ -76,13 +83,39 @@ impl TransactionWorkload {
// cooldown
vec![(write_type, 1)],
])
} else {
request.transaction_type(self.transaction_type.materialize(self.num_modules, false))
}
}
}

impl Display for TransactionWorkload {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Debug::fmt(self, f)
fn name(&self) -> String {
// assert!(!self.unique_senders);
format!(
"{:?}{}",
self.transaction_type,
if self.num_modules > 1 {
format!("({} modules)", self.num_modules)
} else {
"".to_string()
}
)
}

fn phase_name(&self, phase: usize) -> String {
assert!(self.unique_senders);
format!(
"{:?}{}",
match phase {
0 => TransactionTypeArg::AccountGenerationLargePool,
1 => self.transaction_type,
_ => unreachable!(),
},
if self.num_modules > 1 {
format!("({} modules)", self.num_modules)
} else {
"".to_string()
}
)
}
}

Expand Down Expand Up @@ -124,7 +157,7 @@ impl LoadVsPerfBenchmark {
for (phase, phase_stats) in stats_by_phase.into_iter().enumerate() {
result.push(SingleRunStats {
name: if phased {
format!("{}_phase_{}", workloads.name(index), phase)
workloads.phase_name(index, phase)
} else {
workloads.name(index)
},
Expand Down Expand Up @@ -164,12 +197,7 @@ impl NetworkTest for LoadVsPerfBenchmark {
}

info!("Starting for {}", self.workloads.name(index));
results.append(&mut self.evaluate_single(
ctx,
&self.workloads,
index,
individual_duration,
)?);
results.push(self.evaluate_single(ctx, &self.workloads, index, individual_duration)?);

// Note: uncomment below to perform reconfig during a test
// let mut aptos_info = ctx.swarm().aptos_public_info();
Expand All @@ -186,25 +214,27 @@ impl NetworkTest for LoadVsPerfBenchmark {
ctx.report.report_text(line);
}
for (index, result) in results.iter().enumerate() {
let rate = result.stats.rate();
// always take last phase for success criteria
let target_result = &result[result.len() - 1];
let rate = target_result.stats.rate();
if let Some(criteria) = self.criteria.get(index) {
SuccessCriteriaChecker::check_core_for_success(
criteria,
ctx.report,
&rate,
Some(&result.latency_breakdown),
Some(result.name.clone()),
Some(&target_result.latency_breakdown),
Some(target_result.name.clone()),
)?;
}
}
Ok(())
}
}

fn to_table(results: &[SingleRunStats]) -> Vec<String> {
fn to_table(results: &[Vec<SingleRunStats>]) -> Vec<String> {
let mut table = Vec::new();
table.push(format!(
"{: <30} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12}",
"{: <40} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12}",
"workload",
"submitted/s",
"committed/s",
Expand All @@ -222,26 +252,28 @@ fn to_table(results: &[SingleRunStats]) -> Vec<String> {
"actual dur"
));

for result in results {
let rate = result.stats.rate();
table.push(format!(
"{: <30} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12}",
result.name,
rate.submitted,
rate.committed,
rate.expired,
rate.failed_submission,
result.ledger_transactions / result.actual_duration.as_secs(),
rate.latency,
rate.p50_latency,
rate.p90_latency,
rate.p99_latency,
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsBatchToPos).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsPosToProposal).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusProposalToOrdered).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusOrderedToCommit).max_sample(),
result.actual_duration.as_secs()
));
for run_results in results {
for result in run_results {
let rate = result.stats.rate();
table.push(format!(
"{: <40} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12.3} | {: <12}",
result.name,
rate.submitted,
rate.committed,
rate.expired,
rate.failed_submission,
result.ledger_transactions / result.actual_duration.as_secs(),
rate.latency,
rate.p50_latency,
rate.p90_latency,
rate.p99_latency,
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsBatchToPos).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::QsPosToProposal).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusProposalToOrdered).max_sample(),
result.latency_breakdown.get_samples(&LatencyBreakdownSlice::ConsensusOrderedToCommit).max_sample(),
result.actual_duration.as_secs()
));
}
}

table
Expand Down

0 comments on commit 8b5de60

Please sign in to comment.