Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-apply qs backpressure increase with buffer latency increase #13961

Merged
merged 3 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ impl Default for MempoolConfig {
system_transaction_timeout_secs: 600,
system_transaction_gc_interval_ms: 60_000,
broadcast_buckets: DEFAULT_BUCKETS.to_vec(),
eager_expire_threshold_ms: Some(10_000),
eager_expire_time_ms: 3_000,
eager_expire_threshold_ms: Some(15_000),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the downside of increasing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new constants make the logic like this:

if there is any transaction in the mempool (that never entered parking lot) that was there for longer than eager_expire_threshold_ms (which is increased from 10s -> 15s here) - that means we have a significant backlog.

if that is the case, we pull into batches only transactions that have at least eager_expire_time_ms time left (increased from 3s to 6s)

We've increased both, as with reduced QS backpressure, we see it taking more time from batch creation to batch being included in the block.

So the side-consequence of this change is that - if that during backlog, if you use expiration <6s, your transactions will be ignored (previously <3s).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. Slightly orthogonal but do we really need this logic after @vusirikala 's change to exclude expired transaction in QS backpressure calculation in https://github.com/aptos-labs/aptos-core/pull/13850/files. cc - @bchocho

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creating useless batches, and then requiring everyone to fetch them, just in order to throw them out - probably has negative effect on our overall throughput.

Copy link
Contributor

@vusirikala vusirikala Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sitalkedia This is the run for graceful overload test with my changes and increasing QS backpressure limits without changing the eager_expire_threshold_ms config.
#13964 (comment)

eager_expire_time_ms: 6_000,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions config/src/config/quorum_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ impl Default for QuorumStoreBackPressureConfig {
QuorumStoreBackPressureConfig {
// QS will be backpressured if the remaining total txns is more than this number
// Roughly, target TPS * commit latency seconds
backlog_txn_limit_count: 12_000,
backlog_txn_limit_count: 36_000,
// QS will create batches at the max rate until this number is reached
backlog_per_validator_batch_limit_count: 4,
backlog_per_validator_batch_limit_count: 20,
decrease_duration_ms: 1000,
increase_duration_ms: 1000,
decrease_fraction: 0.5,
dynamic_min_txn_per_s: 160,
dynamic_max_txn_per_s: 4000,
dynamic_max_txn_per_s: 12000,
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions testsuite/forge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,10 +1118,10 @@ fn realistic_env_workload_sweep_test() -> ForgeConfig {
]),
// Investigate/improve to make latency more predictable on different workloads
criteria: [
(7700, 100, 0.3, 0.3, 0.5, 0.5),
(7000, 100, 0.3, 0.3, 0.5, 0.5),
(2000, 300, 0.3, 0.8, 0.6, 1.0),
(3200, 500, 0.3, 0.4, 0.7, 0.7),
(7700, 100, 0.3, 0.5, 0.5, 0.5),
(7000, 100, 0.3, 0.5, 0.5, 0.5),
(2000, 300, 0.3, 1.0, 0.6, 1.0),
(3200, 500, 0.3, 1.5, 0.7, 0.7),
// (150, 0.5, 1.0, 1.5, 0.65),
]
.into_iter()
Expand Down Expand Up @@ -1966,9 +1966,9 @@ fn realistic_env_max_load_test(
.add_system_metrics_threshold(SystemMetricsThreshold::new(
// Check that we don't use more than 18 CPU cores for 10% of the time.
MetricsThreshold::new(18.0, 10),
// Memory starts around 3GB, and grows around 1.2GB/hr in this test.
// Memory starts around 3.5GB, and grows around 1.4GB/hr in this test.
// Check that we don't use more than final expected memory for more than 10% of the time.
MetricsThreshold::new_gb(3.3 + 1.4 * (duration_secs as f64 / 3600.0), 10),
MetricsThreshold::new_gb(3.5 + 1.4 * (duration_secs as f64 / 3600.0), 10),
))
.add_no_restarts()
.add_wait_for_catchup_s(
Expand All @@ -1986,8 +1986,8 @@ fn realistic_env_max_load_test(
LatencyBreakdownThreshold::new_with_breach_pct(
vec![
(LatencyBreakdownSlice::QsBatchToPos, 0.35),
// only reaches close to threshold during epoch change
(LatencyBreakdownSlice::QsPosToProposal, 0.6),
// quorum store backpressure is relaxed, so queueing happens here
(LatencyBreakdownSlice::QsPosToProposal, 2.5),
// can be adjusted down if less backpressure
(LatencyBreakdownSlice::ConsensusProposalToOrdered, 0.85),
// can be adjusted down if less backpressure
Expand Down
284 changes: 150 additions & 134 deletions testsuite/testcases/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use std::{
};
use tokio::runtime::{Handle, Runtime};

const WARMUP_DURATION_FRACTION: f32 = 0.07;
const COOLDOWN_DURATION_FRACTION: f32 = 0.04;
pub const WARMUP_DURATION_FRACTION: f32 = 0.07;
pub const COOLDOWN_DURATION_FRACTION: f32 = 0.04;

async fn batch_update(
ctx: &mut NetworkContext<'_>,
Expand Down Expand Up @@ -254,7 +254,6 @@ impl NetworkTest for dyn NetworkLoadTest {
.await
.context("no clients replied for start version")?;
let emit_job_request = ctx.emit_job.clone();
let rng = SeedableRng::from_rng(ctx.core().rng())?;
let duration = ctx.global_duration;
let stats_by_phase = self
.network_load_test(
Expand All @@ -263,7 +262,6 @@ impl NetworkTest for dyn NetworkLoadTest {
duration,
WARMUP_DURATION_FRACTION,
COOLDOWN_DURATION_FRACTION,
rng,
)
.await?;

Expand Down Expand Up @@ -328,155 +326,173 @@ impl NetworkTest for dyn NetworkLoadTest {
}
}

impl dyn NetworkLoadTest + '_ {
pub async fn network_load_test<'a>(
&self,
ctx: &mut NetworkContext<'a>,
emit_job_request: EmitJobRequest,
duration: Duration,
warmup_duration_fraction: f32,
cooldown_duration_fraction: f32,
rng: StdRng,
) -> Result<Vec<LoadTestPhaseStats>> {
let destination = self.setup(ctx).await.context("setup NetworkLoadTest")?;
let nodes_to_send_load_to = destination.get_destination_nodes(ctx.swarm.clone()).await;
pub async fn create_buffered_load(
swarm: Arc<tokio::sync::RwLock<Box<dyn Swarm>>>,
nodes_to_send_load_to: &[PeerId],
emit_job_request: EmitJobRequest,
duration: Duration,
warmup_duration_fraction: f32,
cooldown_duration_fraction: f32,
mut inner_test_and_report: Option<(&dyn NetworkLoadTest, &mut TestReport)>,
) -> Result<Vec<LoadTestPhaseStats>> {
// Generate some traffic
let (mut emitter, emit_job_request) = create_emitter_and_request(
swarm.clone(),
emit_job_request,
nodes_to_send_load_to,
StdRng::from_entropy(),
)
.await
.context("create emitter")?;

let clients = swarm
.read()
.await
.get_clients_for_peers(nodes_to_send_load_to, Duration::from_secs(10));

// Generate some traffic
let mut stats_tracking_phases = emit_job_request.get_num_phases();
assert!(stats_tracking_phases > 0 && stats_tracking_phases != 2);
if stats_tracking_phases == 1 {
stats_tracking_phases = 3;
}

let (mut emitter, emit_job_request) = create_emitter_and_request(
ctx.swarm.clone(),
info!("Starting emitting txns for {}s", duration.as_secs());
let mut job = emitter
.start_job(
swarm.read().await.chain_info().root_account,
emit_job_request,
&nodes_to_send_load_to,
rng,
stats_tracking_phases,
)
.await
.context("create emitter")?;
.context("start emitter job")?;

let clients = ctx
.swarm
.read()
.await
.get_clients_for_peers(&nodes_to_send_load_to, Duration::from_secs(10));

let mut stats_tracking_phases = emit_job_request.get_num_phases();
assert!(stats_tracking_phases > 0 && stats_tracking_phases != 2);
if stats_tracking_phases == 1 {
stats_tracking_phases = 3;
}

info!("Starting emitting txns for {}s", duration.as_secs());
let mut job = emitter
.start_job(
ctx.swarm.read().await.chain_info().root_account,
emit_job_request,
stats_tracking_phases,
)
.await
.context("start emitter job")?;

let total_start = PhaseTimingStart::now();
let total_start = PhaseTimingStart::now();

let warmup_duration = duration.mul_f32(warmup_duration_fraction);
let cooldown_duration = duration.mul_f32(cooldown_duration_fraction);
let test_duration = duration - warmup_duration - cooldown_duration;
let phase_duration = test_duration.div_f32((stats_tracking_phases - 2) as f32);
let warmup_duration = duration.mul_f32(warmup_duration_fraction);
let cooldown_duration = duration.mul_f32(cooldown_duration_fraction);
let test_duration = duration - warmup_duration - cooldown_duration;
let phase_duration = test_duration.div_f32((stats_tracking_phases - 2) as f32);

job = job.periodic_stat_forward(warmup_duration, 60).await;
info!("{}s warmup finished", warmup_duration.as_secs());
job = job.periodic_stat_forward(warmup_duration, 60).await;
info!("{}s warmup finished", warmup_duration.as_secs());

let mut phase_timing = Vec::new();
let mut phase_start_network_state = Vec::new();
let test_start = Instant::now();
for i in 0..stats_tracking_phases - 2 {
phase_start_network_state.push(NetworkState::new(&clients).await);
job.start_next_phase();
let mut phase_timing = Vec::new();
let mut phase_start_network_state = Vec::new();
let test_start = Instant::now();
for i in 0..stats_tracking_phases - 2 {
phase_start_network_state.push(NetworkState::new(&clients).await);
job.start_next_phase();

if i > 0 {
info!(
"Starting test phase {} out of {}",
i,
stats_tracking_phases - 2,
);
}
let phase_start = PhaseTimingStart::now();
if i > 0 {
info!(
"Starting test phase {} out of {}",
i,
stats_tracking_phases - 2,
);
}
let phase_start = PhaseTimingStart::now();

let join_stats = Handle::current().spawn(job.periodic_stat_forward(phase_duration, 60));
self.test(ctx.swarm.clone(), ctx.report, phase_duration)
let join_stats = Handle::current().spawn(job.periodic_stat_forward(phase_duration, 60));
if let Some((inner_test, context)) = inner_test_and_report.as_mut() {
inner_test
.test(swarm.clone(), context, phase_duration)
.await
.context("test NetworkLoadTest")?;
job = join_stats.await.context("join stats")?;
phase_timing.push(phase_start.elapsed());
}
let actual_test_duration = test_start.elapsed();
info!(
"{}s test finished after {}s",
test_duration.as_secs(),
actual_test_duration.as_secs()
);

phase_start_network_state.push(NetworkState::new(&clients).await);
job.start_next_phase();
let cooldown_start = Instant::now();
job = join_stats.await.context("join stats")?;
phase_timing.push(phase_start.elapsed());
}
let actual_test_duration = test_start.elapsed();
info!(
"{}s test finished after {}s",
test_duration.as_secs(),
actual_test_duration.as_secs()
);

let cooldown_used = cooldown_start.elapsed();
if cooldown_used < cooldown_duration {
job = job
.periodic_stat_forward(cooldown_duration - cooldown_used, 60)
.await;
}
info!("{}s cooldown finished", cooldown_duration.as_secs());
phase_start_network_state.push(NetworkState::new(&clients).await);
job.start_next_phase();
let cooldown_start = Instant::now();

let total_timing = total_start.elapsed();
let cooldown_used = cooldown_start.elapsed();
if cooldown_used < cooldown_duration {
job = job
.periodic_stat_forward(cooldown_duration - cooldown_used, 60)
.await;
}
info!("{}s cooldown finished", cooldown_duration.as_secs());

let total_timing = total_start.elapsed();
info!(
"Emitting txns ran for {} secs(from {} to {}), stopping job...",
duration.as_secs(),
total_timing.start_unixtime_s,
total_timing.end_unixtime_s,
);
let stats_by_phase = job.stop_job().await;

info!("Stopped job");
info!("Warmup stats: {}", stats_by_phase[0].rate());

let mut stats: Option<TxnStats> = None;
let mut stats_by_phase_filtered = Vec::new();
for i in 0..stats_tracking_phases - 2 {
let next_i = i + 1;
let cur = &stats_by_phase[next_i];
info!("Test stats [test phase {}]: {}", i, cur.rate());
stats = if let Some(previous) = stats {
Some(&previous + cur)
} else {
Some(cur.clone())
};
let latency_breakdown = fetch_latency_breakdown(
swarm.clone(),
phase_timing[i].start_unixtime_s,
phase_timing[i].end_unixtime_s,
)
.await?;
info!(
"Emitting txns ran for {} secs(from {} to {}), stopping job...",
duration.as_secs(),
total_timing.start_unixtime_s,
total_timing.end_unixtime_s,
"Latency breakdown details for phase {}: from {} to {}: {:?}",
i, phase_timing[i].start_unixtime_s, phase_timing[i].end_unixtime_s, latency_breakdown
);
let stats_by_phase = job.stop_job().await;

info!("Stopped job");
info!("Warmup stats: {}", stats_by_phase[0].rate());

let mut stats: Option<TxnStats> = None;
let mut stats_by_phase_filtered = Vec::new();
for i in 0..stats_tracking_phases - 2 {
let next_i = i + 1;
let cur = &stats_by_phase[next_i];
info!("Test stats [test phase {}]: {}", i, cur.rate());
stats = if let Some(previous) = stats {
Some(&previous + cur)
} else {
Some(cur.clone())
};
let latency_breakdown = fetch_latency_breakdown(
ctx.swarm.clone(),
phase_timing[i].start_unixtime_s,
phase_timing[i].end_unixtime_s,
)
.await?;
info!(
"Latency breakdown details for phase {}: from {} to {}: {:?}",
i,
phase_timing[i].start_unixtime_s,
phase_timing[i].end_unixtime_s,
latency_breakdown
);
stats_by_phase_filtered.push(LoadTestPhaseStats {
emitter_stats: cur.clone(),
actual_duration: phase_timing[i].duration,
phase_start_unixtime_s: phase_timing[i].start_unixtime_s,
phase_end_unixtime_s: phase_timing[i].end_unixtime_s,
ledger_transactions: NetworkState::ledger_transactions(
&phase_start_network_state[i],
&phase_start_network_state[next_i],
),
latency_breakdown,
});
}
info!("Cooldown stats: {}", stats_by_phase.last().unwrap().rate());
stats_by_phase_filtered.push(LoadTestPhaseStats {
emitter_stats: cur.clone(),
actual_duration: phase_timing[i].duration,
phase_start_unixtime_s: phase_timing[i].start_unixtime_s,
phase_end_unixtime_s: phase_timing[i].end_unixtime_s,
ledger_transactions: NetworkState::ledger_transactions(
&phase_start_network_state[i],
&phase_start_network_state[next_i],
),
latency_breakdown,
});
}
info!("Cooldown stats: {}", stats_by_phase.last().unwrap().rate());

Ok(stats_by_phase_filtered)
Ok(stats_by_phase_filtered)
}

impl dyn NetworkLoadTest + '_ {
pub async fn network_load_test<'a>(
&self,
ctx: &mut NetworkContext<'a>,
emit_job_request: EmitJobRequest,
duration: Duration,
warmup_duration_fraction: f32,
cooldown_duration_fraction: f32,
) -> Result<Vec<LoadTestPhaseStats>> {
let destination = self.setup(ctx).await.context("setup NetworkLoadTest")?;
let nodes_to_send_load_to = destination.get_destination_nodes(ctx.swarm.clone()).await;

create_buffered_load(
ctx.swarm.clone(),
&nodes_to_send_load_to,
emit_job_request,
duration,
warmup_duration_fraction,
cooldown_duration_fraction,
Some((self, ctx.report)),
)
.await
}
}

Expand Down
Loading
Loading