Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a window in batch generator to calculate num txns to be pulled from mempool #14043

Closed
wants to merge 13 commits into from
5 changes: 5 additions & 0 deletions config/src/config/quorum_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct QuorumStoreConfig {
pub channel_size: usize,
pub proof_timeout_ms: usize,
pub batch_generation_poll_interval_ms: usize,
// No longer being used
pub batch_generation_min_non_empty_interval_ms: usize,
pub batch_generation_max_interval_ms: usize,
/// The maximum number of transactions that the batch generator puts in a batch.
Expand Down Expand Up @@ -91,6 +92,9 @@ pub struct QuorumStoreConfig {
pub num_workers_for_remote_batches: usize,
pub batch_buckets: Vec<u64>,
pub allow_batches_without_pos_in_proposal: bool,
/// If batch generator pulls lesser than `minimum_batch_size` transactions from mempool, batch generator will
/// wait for more transactions to be available in mempool before creating a batch.
pub minimum_batch_size: u64,
}

impl Default for QuorumStoreConfig {
Expand Down Expand Up @@ -129,6 +133,7 @@ impl Default for QuorumStoreConfig {
num_workers_for_remote_batches: 10,
batch_buckets: DEFAULT_BUCKETS.to_vec(),
allow_batches_without_pos_in_proposal: true,
minimum_batch_size: 20,
}
}
}
Expand Down
36 changes: 27 additions & 9 deletions consensus/src/quorum_store/batch_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use aptos_types::{transaction::SignedTransaction, PeerId};
use futures_channel::mpsc::Sender;
use rayon::prelude::*;
use std::{
collections::{btree_map::Entry, BTreeMap, HashMap},
collections::{btree_map::Entry, BTreeMap, HashMap, VecDeque},
sync::Arc,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -402,6 +402,8 @@ impl BatchGenerator {
let mut dynamic_pull_txn_per_s = (self.config.back_pressure.dynamic_min_txn_per_s
+ self.config.back_pressure.dynamic_max_txn_per_s)
/ 2;
// The number of transactions pulled from mempool in the past second
let mut pulled_txns_window: VecDeque<(Instant, u64)> = VecDeque::new();

loop {
let _timer = counters::BATCH_GENERATOR_MAIN_LOOP.start_timer();
Expand Down Expand Up @@ -448,24 +450,40 @@ impl BatchGenerator {
} else {
counters::QS_BACKPRESSURE_PROOF_COUNT.observe(0.0);
}

let since_last_non_empty_pull_ms = std::cmp::min(
tick_start.duration_since(last_non_empty_pull).as_millis(),
self.config.batch_generation_max_interval_ms as u128
) as usize;
if (!self.back_pressure.proof_count
&& since_last_non_empty_pull_ms >= self.config.batch_generation_min_non_empty_interval_ms)
|| since_last_non_empty_pull_ms == self.config.batch_generation_max_interval_ms {

let dynamic_pull_max_txn = std::cmp::max(
(since_last_non_empty_pull_ms as f64 / 1000.0 * dynamic_pull_txn_per_s as f64) as u64, 1);
// If there is txn/proof backpressure, pull once every 250ms.
// If there is no backpressure, pull every 25ms.
if (!self.back_pressure.proof_count && !self.back_pressure.txn_count)
|| since_last_non_empty_pull_ms >= self.config.batch_generation_max_interval_ms {

while let Some(&(timestamp, _)) = pulled_txns_window.front() {
if timestamp.elapsed() >= Duration::from_secs(1) {
pulled_txns_window.pop_front();
} else {
break;
}
}

let pulled_in_last_sec = pulled_txns_window.iter().map(|(_, txns)| txns).sum();
let pull_max_txn = std::cmp::min(
dynamic_pull_max_txn,
std::cmp::max(dynamic_pull_txn_per_s.saturating_sub(pulled_in_last_sec), 1),
self.config.sender_max_total_txns as u64,
);
let batches = self.handle_scheduled_pull(pull_max_txn).await;
if !batches.is_empty() {
last_non_empty_pull = tick_start;
let num_txns_in_batches: u64 = batches.iter().map(|b| b.batch_info().num_txns()).sum();
info!("pulled_txns_window: {:?}, current_time: {:?}, pulled_in_last_sec: {}, pull_max_txn: {}, batch size: {}, num_txns_in_batches: {}", pulled_txns_window, Instant::now(), pulled_in_last_sec, pull_max_txn, batches.len(), num_txns_in_batches);

// Ignore the batch if batch is smaller than minimum batch size and pull again in 25ms
// If a batch hasn't been created for a while, send it out even if it's smaller than minimum batch size
if num_txns_in_batches as u64 >= self.config.minimum_batch_size
|| (!batches.is_empty() && since_last_non_empty_pull_ms >= self.config.batch_generation_min_non_empty_interval_ms) {
last_non_empty_pull = tick_start;
pulled_txns_window.push_back((tick_start, num_txns_in_batches as u64));
let persist_start = Instant::now();
let mut persist_requests = vec![];
for batch in batches.clone().into_iter() {
Expand Down
Loading