diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index 6e442023b1ffa..8a30b918bdae8 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -47,6 +47,7 @@ pub struct QuorumStoreConfig { pub channel_size: usize, pub proof_timeout_ms: usize, pub batch_generation_poll_interval_ms: usize, + // No longer being used pub batch_generation_min_non_empty_interval_ms: usize, pub batch_generation_max_interval_ms: usize, /// The maximum number of transactions that the batch generator puts in a batch. @@ -91,6 +92,9 @@ pub struct QuorumStoreConfig { pub num_workers_for_remote_batches: usize, pub batch_buckets: Vec, pub allow_batches_without_pos_in_proposal: bool, + /// If batch generator pulls lesser than `minimum_batch_size` transactions from mempool, batch generator will + /// wait for more transactions to be available in mempool before creating a batch. + pub minimum_batch_size: u64, } impl Default for QuorumStoreConfig { @@ -129,6 +133,7 @@ impl Default for QuorumStoreConfig { num_workers_for_remote_batches: 10, batch_buckets: DEFAULT_BUCKETS.to_vec(), allow_batches_without_pos_in_proposal: true, + minimum_batch_size: 20, } } } diff --git a/consensus/src/quorum_store/batch_generator.rs b/consensus/src/quorum_store/batch_generator.rs index b163709a74b5e..860d39e8b131a 100644 --- a/consensus/src/quorum_store/batch_generator.rs +++ b/consensus/src/quorum_store/batch_generator.rs @@ -23,7 +23,7 @@ use aptos_types::{transaction::SignedTransaction, PeerId}; use futures_channel::mpsc::Sender; use rayon::prelude::*; use std::{ - collections::{btree_map::Entry, BTreeMap, HashMap}, + collections::{btree_map::Entry, BTreeMap, HashMap, VecDeque}, sync::Arc, time::{Duration, Instant}, }; @@ -402,6 +402,8 @@ impl BatchGenerator { let mut dynamic_pull_txn_per_s = (self.config.back_pressure.dynamic_min_txn_per_s + self.config.back_pressure.dynamic_max_txn_per_s) / 2; + // The number of transactions pulled from mempool in the past second + let mut pulled_txns_window: VecDeque<(Instant, u64)> = VecDeque::new(); loop { let _timer = counters::BATCH_GENERATOR_MAIN_LOOP.start_timer(); @@ -448,24 +450,40 @@ impl BatchGenerator { } else { counters::QS_BACKPRESSURE_PROOF_COUNT.observe(0.0); } + let since_last_non_empty_pull_ms = std::cmp::min( tick_start.duration_since(last_non_empty_pull).as_millis(), self.config.batch_generation_max_interval_ms as u128 ) as usize; - if (!self.back_pressure.proof_count - && since_last_non_empty_pull_ms >= self.config.batch_generation_min_non_empty_interval_ms) - || since_last_non_empty_pull_ms == self.config.batch_generation_max_interval_ms { - let dynamic_pull_max_txn = std::cmp::max( - (since_last_non_empty_pull_ms as f64 / 1000.0 * dynamic_pull_txn_per_s as f64) as u64, 1); + // If there is txn/proof backpressure, pull once every 250ms. + // If there is no backpressure, pull every 25ms. + if (!self.back_pressure.proof_count && !self.back_pressure.txn_count) + || since_last_non_empty_pull_ms >= self.config.batch_generation_max_interval_ms { + + while let Some(&(timestamp, _)) = pulled_txns_window.front() { + if timestamp.elapsed() >= Duration::from_secs(1) { + pulled_txns_window.pop_front(); + } else { + break; + } + } + + let pulled_in_last_sec = pulled_txns_window.iter().map(|(_, txns)| txns).sum(); let pull_max_txn = std::cmp::min( - dynamic_pull_max_txn, + std::cmp::max(dynamic_pull_txn_per_s.saturating_sub(pulled_in_last_sec), 1), self.config.sender_max_total_txns as u64, ); let batches = self.handle_scheduled_pull(pull_max_txn).await; - if !batches.is_empty() { - last_non_empty_pull = tick_start; + let num_txns_in_batches: u64 = batches.iter().map(|b| b.batch_info().num_txns()).sum(); + info!("pulled_txns_window: {:?}, current_time: {:?}, pulled_in_last_sec: {}, pull_max_txn: {}, batch size: {}, num_txns_in_batches: {}", pulled_txns_window, Instant::now(), pulled_in_last_sec, pull_max_txn, batches.len(), num_txns_in_batches); + // Ignore the batch if batch is smaller than minimum batch size and pull again in 25ms + // If a batch hasn't been created for a while, send it out even if it's smaller than minimum batch size + if num_txns_in_batches as u64 >= self.config.minimum_batch_size + || (!batches.is_empty() && since_last_non_empty_pull_ms >= self.config.batch_generation_min_non_empty_interval_ms) { + last_non_empty_pull = tick_start; + pulled_txns_window.push_back((tick_start, num_txns_in_batches as u64)); let persist_start = Instant::now(); let mut persist_requests = vec![]; for batch in batches.clone().into_iter() {