From 1e4834e360aa45991a6247f5b057b8ac0873dc9d Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Thu, 18 Jul 2024 13:32:25 -0700 Subject: [PATCH 1/9] Use mempool window in batch generator --- config/src/config/quorum_store_config.rs | 6 +++- consensus/src/quorum_store/batch_generator.rs | 33 +++++++++++++++---- 2 files changed, 31 insertions(+), 8 deletions(-) diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index 6e442023b1ffa..fea31dd1f9faa 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -91,6 +91,9 @@ pub struct QuorumStoreConfig { pub num_workers_for_remote_batches: usize, pub batch_buckets: Vec, pub allow_batches_without_pos_in_proposal: bool, + /// If batch generator pulls lesser than `minimum_batch_size` transactions from mempool, batch generator will + /// wait for more transactions to be available in mempool before creating a batch. + pub minimum_batch_size: u64, } impl Default for QuorumStoreConfig { @@ -99,7 +102,7 @@ impl Default for QuorumStoreConfig { channel_size: 1000, proof_timeout_ms: 10000, batch_generation_poll_interval_ms: 25, - batch_generation_min_non_empty_interval_ms: 200, + batch_generation_min_non_empty_interval_ms: 50, batch_generation_max_interval_ms: 250, sender_max_batch_txns: 250, // TODO: on next release, remove BATCH_PADDING_BYTES @@ -129,6 +132,7 @@ impl Default for QuorumStoreConfig { num_workers_for_remote_batches: 10, batch_buckets: DEFAULT_BUCKETS.to_vec(), allow_batches_without_pos_in_proposal: true, + minimum_batch_size: 5, } } } diff --git a/consensus/src/quorum_store/batch_generator.rs b/consensus/src/quorum_store/batch_generator.rs index b163709a74b5e..5b6c17d05e206 100644 --- a/consensus/src/quorum_store/batch_generator.rs +++ b/consensus/src/quorum_store/batch_generator.rs @@ -23,7 +23,7 @@ use aptos_types::{transaction::SignedTransaction, PeerId}; use futures_channel::mpsc::Sender; use rayon::prelude::*; use std::{ - collections::{btree_map::Entry, BTreeMap, HashMap}, + collections::{btree_map::Entry, BTreeMap, HashMap, VecDeque}, sync::Arc, time::{Duration, Instant}, }; @@ -402,6 +402,8 @@ impl BatchGenerator { let mut dynamic_pull_txn_per_s = (self.config.back_pressure.dynamic_min_txn_per_s + self.config.back_pressure.dynamic_max_txn_per_s) / 2; + // The number of transactions pulled from mempool in the past second + let mut pulled_txns_window: VecDeque<(Instant, u64)> = VecDeque::new(); loop { let _timer = counters::BATCH_GENERATOR_MAIN_LOOP.start_timer(); @@ -448,22 +450,39 @@ impl BatchGenerator { } else { counters::QS_BACKPRESSURE_PROOF_COUNT.observe(0.0); } + let since_last_non_empty_pull_ms = std::cmp::min( tick_start.duration_since(last_non_empty_pull).as_millis(), self.config.batch_generation_max_interval_ms as u128 ) as usize; - if (!self.back_pressure.proof_count - && since_last_non_empty_pull_ms >= self.config.batch_generation_min_non_empty_interval_ms) - || since_last_non_empty_pull_ms == self.config.batch_generation_max_interval_ms { - let dynamic_pull_max_txn = std::cmp::max( - (since_last_non_empty_pull_ms as f64 / 1000.0 * dynamic_pull_txn_per_s as f64) as u64, 1); + // If there is txn/proof backpressure, pull once every 250ms. + // If there is no backpressure, pull every 25ms. + if (!self.back_pressure.proof_count && !self.back_pressure.txn_count) + || since_last_non_empty_pull_ms >= self.config.batch_generation_max_interval_ms { + + while let Some(&(tick_start, _)) = pulled_txns_window.front() { + if tick_start.elapsed() >= Duration::from_secs(1) { + pulled_txns_window.pop_front(); + } else { + break; + } + } + + let pulled_in_last_sec = pulled_txns_window.iter().map(|(_, txns)| txns).sum(); + let dynamic_pull_max_txn = dynamic_pull_txn_per_s.saturating_sub(pulled_in_last_sec); let pull_max_txn = std::cmp::min( dynamic_pull_max_txn, self.config.sender_max_total_txns as u64, ); let batches = self.handle_scheduled_pull(pull_max_txn).await; - if !batches.is_empty() { + + info!("pulled_txns_window: {:?}, current_time: {:?}, pulled_in_last_sec: {}, pull_max_txn: {}, batch size: {}", pulled_txns_window, Instant::now(), pulled_in_last_sec, pull_max_txn, batches.len()); + + // Ignore the batch if batch is smaller than minimum batch size and pull again in 25ms + // If a batch hasn't been created for a while, send it out even if it's smaller than minimum batch size + if batches.len() as u64 >= self.config.minimum_batch_size + || (!batches.is_empty() && since_last_non_empty_pull_ms >= self.config.batch_generation_max_interval_ms) { last_non_empty_pull = tick_start; let persist_start = Instant::now(); From e4b1c0707f6f571071616ba12303e1748539de4b Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Thu, 18 Jul 2024 13:35:22 -0700 Subject: [PATCH 2/9] Minor change --- config/src/config/quorum_store_config.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index fea31dd1f9faa..c9856a4e7a73f 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -47,6 +47,7 @@ pub struct QuorumStoreConfig { pub channel_size: usize, pub proof_timeout_ms: usize, pub batch_generation_poll_interval_ms: usize, + // No longer being used pub batch_generation_min_non_empty_interval_ms: usize, pub batch_generation_max_interval_ms: usize, /// The maximum number of transactions that the batch generator puts in a batch. @@ -102,7 +103,7 @@ impl Default for QuorumStoreConfig { channel_size: 1000, proof_timeout_ms: 10000, batch_generation_poll_interval_ms: 25, - batch_generation_min_non_empty_interval_ms: 50, + batch_generation_min_non_empty_interval_ms: 200, batch_generation_max_interval_ms: 250, sender_max_batch_txns: 250, // TODO: on next release, remove BATCH_PADDING_BYTES From ef47712872ec8ebe4aeb87c7f9241b6ca6a98a39 Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Thu, 18 Jul 2024 14:14:26 -0700 Subject: [PATCH 3/9] Minor fix --- config/src/config/quorum_store_config.rs | 2 +- consensus/src/quorum_store/batch_generator.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index c9856a4e7a73f..297b1c9589be2 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -109,7 +109,7 @@ impl Default for QuorumStoreConfig { // TODO: on next release, remove BATCH_PADDING_BYTES sender_max_batch_bytes: 1024 * 1024 - BATCH_PADDING_BYTES, sender_max_num_batches: DEFAULT_MAX_NUM_BATCHES, - sender_max_total_txns: 2000, + sender_max_total_txns: 2500, // TODO: on next release, remove DEFAULT_MAX_NUM_BATCHES * BATCH_PADDING_BYTES sender_max_total_bytes: 4 * 1024 * 1024 - DEFAULT_MAX_NUM_BATCHES * BATCH_PADDING_BYTES, receiver_max_batch_txns: 250, diff --git a/consensus/src/quorum_store/batch_generator.rs b/consensus/src/quorum_store/batch_generator.rs index 5b6c17d05e206..3930cbc5887df 100644 --- a/consensus/src/quorum_store/batch_generator.rs +++ b/consensus/src/quorum_store/batch_generator.rs @@ -484,7 +484,7 @@ impl BatchGenerator { if batches.len() as u64 >= self.config.minimum_batch_size || (!batches.is_empty() && since_last_non_empty_pull_ms >= self.config.batch_generation_max_interval_ms) { last_non_empty_pull = tick_start; - + pulled_txns_window.push_back((tick_start, batches.len() as u64)); let persist_start = Instant::now(); let mut persist_requests = vec![]; for batch in batches.clone().into_iter() { From 9bfe18bb5aa53ef4a76d4e283aec574a2cc578de Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Thu, 18 Jul 2024 15:10:14 -0700 Subject: [PATCH 4/9] Minor change --- consensus/src/quorum_store/batch_generator.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/consensus/src/quorum_store/batch_generator.rs b/consensus/src/quorum_store/batch_generator.rs index 3930cbc5887df..6158c21a12fbb 100644 --- a/consensus/src/quorum_store/batch_generator.rs +++ b/consensus/src/quorum_store/batch_generator.rs @@ -470,9 +470,8 @@ impl BatchGenerator { } let pulled_in_last_sec = pulled_txns_window.iter().map(|(_, txns)| txns).sum(); - let dynamic_pull_max_txn = dynamic_pull_txn_per_s.saturating_sub(pulled_in_last_sec); let pull_max_txn = std::cmp::min( - dynamic_pull_max_txn, + std::cmp::max(dynamic_pull_txn_per_s.saturating_sub(pulled_in_last_sec), 1), self.config.sender_max_total_txns as u64, ); let batches = self.handle_scheduled_pull(pull_max_txn).await; From e352eb23a7ae3466fdf72dd4783dcfc1d40a3f96 Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Thu, 18 Jul 2024 15:44:34 -0700 Subject: [PATCH 5/9] Minor fix --- consensus/src/quorum_store/batch_generator.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/consensus/src/quorum_store/batch_generator.rs b/consensus/src/quorum_store/batch_generator.rs index 6158c21a12fbb..1871b0596a56b 100644 --- a/consensus/src/quorum_store/batch_generator.rs +++ b/consensus/src/quorum_store/batch_generator.rs @@ -475,15 +475,15 @@ impl BatchGenerator { self.config.sender_max_total_txns as u64, ); let batches = self.handle_scheduled_pull(pull_max_txn).await; - - info!("pulled_txns_window: {:?}, current_time: {:?}, pulled_in_last_sec: {}, pull_max_txn: {}, batch size: {}", pulled_txns_window, Instant::now(), pulled_in_last_sec, pull_max_txn, batches.len()); + let num_txns_in_batches: u64 = batches.iter().map(|b| b.batch_info().num_txns()).sum(); + info!("pulled_txns_window: {:?}, current_time: {:?}, pulled_in_last_sec: {}, pull_max_txn: {}, batch size: {}, num_txns_in_batches: {}", pulled_txns_window, Instant::now(), pulled_in_last_sec, pull_max_txn, batches.len(), num_txns_in_batches); // Ignore the batch if batch is smaller than minimum batch size and pull again in 25ms // If a batch hasn't been created for a while, send it out even if it's smaller than minimum batch size - if batches.len() as u64 >= self.config.minimum_batch_size + if num_txns_in_batches as u64 >= self.config.minimum_batch_size || (!batches.is_empty() && since_last_non_empty_pull_ms >= self.config.batch_generation_max_interval_ms) { last_non_empty_pull = tick_start; - pulled_txns_window.push_back((tick_start, batches.len() as u64)); + pulled_txns_window.push_back((tick_start, num_txns_in_batches as u64)); let persist_start = Instant::now(); let mut persist_requests = vec![]; for batch in batches.clone().into_iter() { From 245838a302e9ffe41efce7c29369b8013cc41e63 Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Mon, 22 Jul 2024 14:07:54 -0700 Subject: [PATCH 6/9] Minor change --- consensus/src/quorum_store/batch_generator.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consensus/src/quorum_store/batch_generator.rs b/consensus/src/quorum_store/batch_generator.rs index 1871b0596a56b..99951a791884b 100644 --- a/consensus/src/quorum_store/batch_generator.rs +++ b/consensus/src/quorum_store/batch_generator.rs @@ -461,8 +461,8 @@ impl BatchGenerator { if (!self.back_pressure.proof_count && !self.back_pressure.txn_count) || since_last_non_empty_pull_ms >= self.config.batch_generation_max_interval_ms { - while let Some(&(tick_start, _)) = pulled_txns_window.front() { - if tick_start.elapsed() >= Duration::from_secs(1) { + while let Some(&(timestamp, _)) = pulled_txns_window.front() { + if timestamp.elapsed() >= Duration::from_secs(1) { pulled_txns_window.pop_front(); } else { break; From 6497dc177ccc8f18d0f64a88171fffd969b05427 Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Mon, 22 Jul 2024 14:44:12 -0700 Subject: [PATCH 7/9] Resetting config --- config/src/config/quorum_store_config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index 297b1c9589be2..c9856a4e7a73f 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -109,7 +109,7 @@ impl Default for QuorumStoreConfig { // TODO: on next release, remove BATCH_PADDING_BYTES sender_max_batch_bytes: 1024 * 1024 - BATCH_PADDING_BYTES, sender_max_num_batches: DEFAULT_MAX_NUM_BATCHES, - sender_max_total_txns: 2500, + sender_max_total_txns: 2000, // TODO: on next release, remove DEFAULT_MAX_NUM_BATCHES * BATCH_PADDING_BYTES sender_max_total_bytes: 4 * 1024 * 1024 - DEFAULT_MAX_NUM_BATCHES * BATCH_PADDING_BYTES, receiver_max_batch_txns: 250, From 32c06b8c299a01ff6a7beef8c1e62890e58120e3 Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Mon, 22 Jul 2024 15:54:51 -0700 Subject: [PATCH 8/9] Changed config --- config/src/config/quorum_store_config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index c9856a4e7a73f..8a30b918bdae8 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -133,7 +133,7 @@ impl Default for QuorumStoreConfig { num_workers_for_remote_batches: 10, batch_buckets: DEFAULT_BUCKETS.to_vec(), allow_batches_without_pos_in_proposal: true, - minimum_batch_size: 5, + minimum_batch_size: 20, } } } From ed25b3c3e51b10fb2ce7d328952a8253fe4279a6 Mon Sep 17 00:00:00 2001 From: Satya Vusirikala Date: Mon, 22 Jul 2024 15:56:51 -0700 Subject: [PATCH 9/9] Updated configs --- consensus/src/quorum_store/batch_generator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/src/quorum_store/batch_generator.rs b/consensus/src/quorum_store/batch_generator.rs index 99951a791884b..860d39e8b131a 100644 --- a/consensus/src/quorum_store/batch_generator.rs +++ b/consensus/src/quorum_store/batch_generator.rs @@ -481,7 +481,7 @@ impl BatchGenerator { // Ignore the batch if batch is smaller than minimum batch size and pull again in 25ms // If a batch hasn't been created for a while, send it out even if it's smaller than minimum batch size if num_txns_in_batches as u64 >= self.config.minimum_batch_size - || (!batches.is_empty() && since_last_non_empty_pull_ms >= self.config.batch_generation_max_interval_ms) { + || (!batches.is_empty() && since_last_non_empty_pull_ms >= self.config.batch_generation_min_non_empty_interval_ms) { last_non_empty_pull = tick_start; pulled_txns_window.push_back((tick_start, num_txns_in_batches as u64)); let persist_start = Instant::now();