Skip to content

Commit

Permalink
Re-apply qs backpressure increase with buffer latency increase (#13961)…
Browse files Browse the repository at this point in the history
… (#13997)

* [quorum store] reduce backpressure significantly for more TPS (#13558)

As Quorum Store batches are bucketed, and we are looking to increase block limits, now is the time to reduce Quorum Store backpressure.

We now allow 36K transactions outstanding. At 12K TPS, this is approximately 3 seconds worth of batches.

For forge tests, a lot of the queuing shifts from mempool to POS-to-Proposal, so the limits need to be adjusted accordingly.

* increase buffer for expiration in batch creation

* adding buffers on inner traffic as well

---------
  • Loading branch information
igor-aptos authored Jul 15, 2024
1 parent f669e69 commit 6d3beea
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 186 deletions.
4 changes: 2 additions & 2 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ impl Default for MempoolConfig {
system_transaction_timeout_secs: 600,
system_transaction_gc_interval_ms: 60_000,
broadcast_buckets: DEFAULT_BUCKETS.to_vec(),
eager_expire_threshold_ms: Some(10_000),
eager_expire_time_ms: 3_000,
eager_expire_threshold_ms: Some(15_000),
eager_expire_time_ms: 6_000,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions config/src/config/quorum_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ impl Default for QuorumStoreBackPressureConfig {
QuorumStoreBackPressureConfig {
// QS will be backpressured if the remaining total txns is more than this number
// Roughly, target TPS * commit latency seconds
backlog_txn_limit_count: 12_000,
backlog_txn_limit_count: 36_000,
// QS will create batches at the max rate until this number is reached
backlog_per_validator_batch_limit_count: 4,
backlog_per_validator_batch_limit_count: 20,
decrease_duration_ms: 1000,
increase_duration_ms: 1000,
decrease_fraction: 0.5,
dynamic_min_txn_per_s: 160,
dynamic_max_txn_per_s: 4000,
dynamic_max_txn_per_s: 12000,
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,10 +1114,10 @@ fn realistic_env_workload_sweep_test() -> ForgeConfig {
]),
// Investigate/improve to make latency more predictable on different workloads
criteria: [
(7700, 100, 0.3, 0.3, 0.5, 0.5),
(7000, 100, 0.3, 0.3, 0.5, 0.5),
(2000, 300, 0.3, 0.8, 0.6, 1.0),
(3200, 500, 0.3, 0.4, 0.7, 0.7),
(7700, 100, 0.3, 0.5, 0.5, 0.5),
(7000, 100, 0.3, 0.5, 0.5, 0.5),
(2000, 300, 0.3, 1.0, 0.6, 1.0),
(3200, 500, 0.3, 1.5, 0.7, 0.7),
// (150, 0.5, 1.0, 1.5, 0.65),
]
.into_iter()
Expand Down Expand Up @@ -1952,9 +1952,9 @@ fn realistic_env_max_load_test(
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// Check that we don't use more than 18 CPU cores for 10% of the time.
MetricsThreshold::new(18.0, 10),
// Memory starts around 3GB, and grows around 1.2GB/hr in this test.
// Memory starts around 3.5GB, and grows around 1.4GB/hr in this test.
// Check that we don't use more than final expected memory for more than 10% of the time.
MetricsThreshold::new_gb(3.3 + 1.4 * (duration_secs as f64 / 3600.0), 10),
MetricsThreshold::new_gb(3.5 + 1.4 * (duration_secs as f64 / 3600.0), 10),
))
.add_no_restarts()
.add_wait_for_catchup_s(
Expand All @@ -1972,8 +1972,8 @@ fn realistic_env_max_load_test(
LatencyBreakdownThreshold::new_with_breach_pct(
vec![
(LatencyBreakdownSlice::QsBatchToPos, 0.35),
// only reaches close to threshold during epoch change
(LatencyBreakdownSlice::QsPosToProposal, 0.6),
// quorum store backpressure is relaxed, so queueing happens here
(LatencyBreakdownSlice::QsPosToProposal, 2.5),
// can be adjusted down if less backpressure
(LatencyBreakdownSlice::ConsensusProposalToOrdered, 0.85),
// can be adjusted down if less backpressure
Expand Down
284 changes: 150 additions & 134 deletions testsuite/testcases/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use std::{
};
use tokio::runtime::{Handle, Runtime};

const WARMUP_DURATION_FRACTION: f32 = 0.07;
const COOLDOWN_DURATION_FRACTION: f32 = 0.04;
pub const WARMUP_DURATION_FRACTION: f32 = 0.07;
pub const COOLDOWN_DURATION_FRACTION: f32 = 0.04;

async fn batch_update(
ctx: &mut NetworkContext<'_>,
Expand Down Expand Up @@ -254,7 +254,6 @@ impl NetworkTest for dyn NetworkLoadTest {
.await
.context("no clients replied for start version")?;
let emit_job_request = ctx.emit_job.clone();
let rng = SeedableRng::from_rng(ctx.core().rng())?;
let duration = ctx.global_duration;
let stats_by_phase = self
.network_load_test(
Expand All @@ -263,7 +262,6 @@ impl NetworkTest for dyn NetworkLoadTest {
duration,
WARMUP_DURATION_FRACTION,
COOLDOWN_DURATION_FRACTION,
rng,
)
.await?;

Expand Down Expand Up @@ -328,155 +326,173 @@ impl NetworkTest for dyn NetworkLoadTest {
}
}

impl dyn NetworkLoadTest + '_ {
pub async fn network_load_test<'a>(
&self,
ctx: &mut NetworkContext<'a>,
emit_job_request: EmitJobRequest,
duration: Duration,
warmup_duration_fraction: f32,
cooldown_duration_fraction: f32,
rng: StdRng,
) -> Result<Vec<LoadTestPhaseStats>> {
let destination = self.setup(ctx).await.context("setup NetworkLoadTest")?;
let nodes_to_send_load_to = destination.get_destination_nodes(ctx.swarm.clone()).await;
pub async fn create_buffered_load(
swarm: Arc<tokio::sync::RwLock<Box<dyn Swarm>>>,
nodes_to_send_load_to: &[PeerId],
emit_job_request: EmitJobRequest,
duration: Duration,
warmup_duration_fraction: f32,
cooldown_duration_fraction: f32,
mut inner_test_and_report: Option<(&dyn NetworkLoadTest, &mut TestReport)>,
) -> Result<Vec<LoadTestPhaseStats>> {
// Generate some traffic
let (mut emitter, emit_job_request) = create_emitter_and_request(
swarm.clone(),
emit_job_request,
nodes_to_send_load_to,
StdRng::from_entropy(),
)
.await
.context("create emitter")?;

let clients = swarm
.read()
.await
.get_clients_for_peers(nodes_to_send_load_to, Duration::from_secs(10));

// Generate some traffic
let mut stats_tracking_phases = emit_job_request.get_num_phases();
assert!(stats_tracking_phases > 0 && stats_tracking_phases != 2);
if stats_tracking_phases == 1 {
stats_tracking_phases = 3;
}

let (mut emitter, emit_job_request) = create_emitter_and_request(
ctx.swarm.clone(),
info!("Starting emitting txns for {}s", duration.as_secs());
let mut job = emitter
.start_job(
swarm.read().await.chain_info().root_account,
emit_job_request,
&nodes_to_send_load_to,
rng,
stats_tracking_phases,
)
.await
.context("create emitter")?;
.context("start emitter job")?;

let clients = ctx
.swarm
.read()
.await
.get_clients_for_peers(&nodes_to_send_load_to, Duration::from_secs(10));

let mut stats_tracking_phases = emit_job_request.get_num_phases();
assert!(stats_tracking_phases > 0 && stats_tracking_phases != 2);
if stats_tracking_phases == 1 {
stats_tracking_phases = 3;
}

info!("Starting emitting txns for {}s", duration.as_secs());
let mut job = emitter
.start_job(
ctx.swarm.read().await.chain_info().root_account,
emit_job_request,
stats_tracking_phases,
)
.await
.context("start emitter job")?;

let total_start = PhaseTimingStart::now();
let total_start = PhaseTimingStart::now();

let warmup_duration = duration.mul_f32(warmup_duration_fraction);
let cooldown_duration = duration.mul_f32(cooldown_duration_fraction);
let test_duration = duration - warmup_duration - cooldown_duration;
let phase_duration = test_duration.div_f32((stats_tracking_phases - 2) as f32);
let warmup_duration = duration.mul_f32(warmup_duration_fraction);
let cooldown_duration = duration.mul_f32(cooldown_duration_fraction);
let test_duration = duration - warmup_duration - cooldown_duration;
let phase_duration = test_duration.div_f32((stats_tracking_phases - 2) as f32);

job = job.periodic_stat_forward(warmup_duration, 60).await;
info!("{}s warmup finished", warmup_duration.as_secs());
job = job.periodic_stat_forward(warmup_duration, 60).await;
info!("{}s warmup finished", warmup_duration.as_secs());

let mut phase_timing = Vec::new();
let mut phase_start_network_state = Vec::new();
let test_start = Instant::now();
for i in 0..stats_tracking_phases - 2 {
phase_start_network_state.push(NetworkState::new(&clients).await);
job.start_next_phase();
let mut phase_timing = Vec::new();
let mut phase_start_network_state = Vec::new();
let test_start = Instant::now();
for i in 0..stats_tracking_phases - 2 {
phase_start_network_state.push(NetworkState::new(&clients).await);
job.start_next_phase();

if i > 0 {
info!(
"Starting test phase {} out of {}",
i,
stats_tracking_phases - 2,
);
}
let phase_start = PhaseTimingStart::now();
if i > 0 {
info!(
"Starting test phase {} out of {}",
i,
stats_tracking_phases - 2,
);
}
let phase_start = PhaseTimingStart::now();

let join_stats = Handle::current().spawn(job.periodic_stat_forward(phase_duration, 60));
self.test(ctx.swarm.clone(), ctx.report, phase_duration)
let join_stats = Handle::current().spawn(job.periodic_stat_forward(phase_duration, 60));
if let Some((inner_test, context)) = inner_test_and_report.as_mut() {
inner_test
.test(swarm.clone(), context, phase_duration)
.await
.context("test NetworkLoadTest")?;
job = join_stats.await.context("join stats")?;
phase_timing.push(phase_start.elapsed());
}
let actual_test_duration = test_start.elapsed();
info!(
"{}s test finished after {}s",
test_duration.as_secs(),
actual_test_duration.as_secs()
);

phase_start_network_state.push(NetworkState::new(&clients).await);
job.start_next_phase();
let cooldown_start = Instant::now();
job = join_stats.await.context("join stats")?;
phase_timing.push(phase_start.elapsed());
}
let actual_test_duration = test_start.elapsed();
info!(
"{}s test finished after {}s",
test_duration.as_secs(),
actual_test_duration.as_secs()
);

let cooldown_used = cooldown_start.elapsed();
if cooldown_used < cooldown_duration {
job = job
.periodic_stat_forward(cooldown_duration - cooldown_used, 60)
.await;
}
info!("{}s cooldown finished", cooldown_duration.as_secs());
phase_start_network_state.push(NetworkState::new(&clients).await);
job.start_next_phase();
let cooldown_start = Instant::now();

let total_timing = total_start.elapsed();
let cooldown_used = cooldown_start.elapsed();
if cooldown_used < cooldown_duration {
job = job
.periodic_stat_forward(cooldown_duration - cooldown_used, 60)
.await;
}
info!("{}s cooldown finished", cooldown_duration.as_secs());

let total_timing = total_start.elapsed();
info!(
"Emitting txns ran for {} secs(from {} to {}), stopping job...",
duration.as_secs(),
total_timing.start_unixtime_s,
total_timing.end_unixtime_s,
);
let stats_by_phase = job.stop_job().await;

info!("Stopped job");
info!("Warmup stats: {}", stats_by_phase[0].rate());

let mut stats: Option<TxnStats> = None;
let mut stats_by_phase_filtered = Vec::new();
for i in 0..stats_tracking_phases - 2 {
let next_i = i + 1;
let cur = &stats_by_phase[next_i];
info!("Test stats [test phase {}]: {}", i, cur.rate());
stats = if let Some(previous) = stats {
Some(&previous + cur)
} else {
Some(cur.clone())
};
let latency_breakdown = fetch_latency_breakdown(
swarm.clone(),
phase_timing[i].start_unixtime_s,
phase_timing[i].end_unixtime_s,
)
.await?;
info!(
"Emitting txns ran for {} secs(from {} to {}), stopping job...",
duration.as_secs(),
total_timing.start_unixtime_s,
total_timing.end_unixtime_s,
"Latency breakdown details for phase {}: from {} to {}: {:?}",
i, phase_timing[i].start_unixtime_s, phase_timing[i].end_unixtime_s, latency_breakdown
);
let stats_by_phase = job.stop_job().await;

info!("Stopped job");
info!("Warmup stats: {}", stats_by_phase[0].rate());

let mut stats: Option<TxnStats> = None;
let mut stats_by_phase_filtered = Vec::new();
for i in 0..stats_tracking_phases - 2 {
let next_i = i + 1;
let cur = &stats_by_phase[next_i];
info!("Test stats [test phase {}]: {}", i, cur.rate());
stats = if let Some(previous) = stats {
Some(&previous + cur)
} else {
Some(cur.clone())
};
let latency_breakdown = fetch_latency_breakdown(
ctx.swarm.clone(),
phase_timing[i].start_unixtime_s,
phase_timing[i].end_unixtime_s,
)
.await?;
info!(
"Latency breakdown details for phase {}: from {} to {}: {:?}",
i,
phase_timing[i].start_unixtime_s,
phase_timing[i].end_unixtime_s,
latency_breakdown
);
stats_by_phase_filtered.push(LoadTestPhaseStats {
emitter_stats: cur.clone(),
actual_duration: phase_timing[i].duration,
phase_start_unixtime_s: phase_timing[i].start_unixtime_s,
phase_end_unixtime_s: phase_timing[i].end_unixtime_s,
ledger_transactions: NetworkState::ledger_transactions(
&phase_start_network_state[i],
&phase_start_network_state[next_i],
),
latency_breakdown,
});
}
info!("Cooldown stats: {}", stats_by_phase.last().unwrap().rate());
stats_by_phase_filtered.push(LoadTestPhaseStats {
emitter_stats: cur.clone(),
actual_duration: phase_timing[i].duration,
phase_start_unixtime_s: phase_timing[i].start_unixtime_s,
phase_end_unixtime_s: phase_timing[i].end_unixtime_s,
ledger_transactions: NetworkState::ledger_transactions(
&phase_start_network_state[i],
&phase_start_network_state[next_i],
),
latency_breakdown,
});
}
info!("Cooldown stats: {}", stats_by_phase.last().unwrap().rate());

Ok(stats_by_phase_filtered)
Ok(stats_by_phase_filtered)
}

impl dyn NetworkLoadTest + '_ {
pub async fn network_load_test<'a>(
&self,
ctx: &mut NetworkContext<'a>,
emit_job_request: EmitJobRequest,
duration: Duration,
warmup_duration_fraction: f32,
cooldown_duration_fraction: f32,
) -> Result<Vec<LoadTestPhaseStats>> {
let destination = self.setup(ctx).await.context("setup NetworkLoadTest")?;
let nodes_to_send_load_to = destination.get_destination_nodes(ctx.swarm.clone()).await;

create_buffered_load(
ctx.swarm.clone(),
&nodes_to_send_load_to,
emit_job_request,
duration,
warmup_duration_fraction,
cooldown_duration_fraction,
Some((self, ctx.report)),
)
.await
}
}

Expand Down
Loading

0 comments on commit 6d3beea

Please sign in to comment.