From d6720851a312e5af6e33a73d109e181a4108594e Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Fri, 22 Nov 2024 14:16:34 -0800 Subject: [PATCH] [consensus] sync improvements to help slow nodes sync better (#15364) * [qs] grace period before GC committed batches * [consensus] trigger sync based on remote LI timestamp --- consensus/src/block_storage/sync_manager.rs | 7 +++++++ consensus/src/quorum_store/batch_store.rs | 11 +++++++++-- consensus/src/quorum_store/quorum_store_builder.rs | 1 + consensus/src/quorum_store/tests/batch_store_test.rs | 1 + .../forge-cli/src/suites/realistic_environment.rs | 6 +++--- 5 files changed, 21 insertions(+), 5 deletions(-) diff --git a/consensus/src/block_storage/sync_manager.rs b/consensus/src/block_storage/sync_manager.rs index 07b20edf8a922..3f9e917479bad 100644 --- a/consensus/src/block_storage/sync_manager.rs +++ b/consensus/src/block_storage/sync_manager.rs @@ -68,6 +68,13 @@ impl BlockStore { && !self.block_exists(li.commit_info().id())) || self.commit_root().round() + 30.max(2 * self.vote_back_pressure_limit) < li.commit_info().round() + // If the LI commit block timestamp is more than 30 secs ahead of self commit block + // timestamp, sync to the ledger info + || li + .commit_info() + .timestamp_usecs() + .saturating_sub(self.commit_root().timestamp_usecs()) + >= Duration::from_secs(30).as_micros() as u64 } /// Checks if quorum certificate can be inserted in block store without RPC diff --git a/consensus/src/quorum_store/batch_store.rs b/consensus/src/quorum_store/batch_store.rs index f8d32b23137ff..74720737dd278 100644 --- a/consensus/src/quorum_store/batch_store.rs +++ b/consensus/src/quorum_store/batch_store.rs @@ -117,6 +117,7 @@ pub struct BatchStore { batch_quota: usize, validator_signer: ValidatorSigner, persist_subscribers: DashMap>>, + expiration_buffer_usecs: u64, } impl BatchStore { @@ -128,6 +129,7 @@ impl BatchStore { db_quota: usize, batch_quota: usize, validator_signer: ValidatorSigner, + expiration_buffer_usecs: u64, ) -> Self { let db_clone = db.clone(); let batch_store = Self { @@ -142,6 +144,7 @@ impl BatchStore { batch_quota, validator_signer, persist_subscribers: DashMap::new(), + expiration_buffer_usecs, }; let db_content = db_clone .get_all_batches() @@ -283,7 +286,11 @@ impl BatchStore { // pub(crate) for testing #[allow(clippy::unwrap_used)] pub(crate) fn clear_expired_payload(&self, certified_time: u64) -> Vec { - let expired_digests = self.expirations.lock().unwrap().expire(certified_time); + // To help slow nodes catch up via execution without going to state sync we keep the blocks for 60 extra seconds + // after the expiration time. This will help remote peers fetch batches that just expired but are within their + // execution window. + let expiration_time = certified_time.saturating_sub(self.expiration_buffer_usecs); + let expired_digests = self.expirations.lock().unwrap().expire(expiration_time); let mut ret = Vec::new(); for h in expired_digests { let removed_value = match self.db_cache.entry(h) { @@ -291,7 +298,7 @@ impl BatchStore { // We need to check up-to-date expiration again because receiving the same // digest with a higher expiration would update the persisted value and // effectively extend the expiration. - if entry.get().expiration() <= certified_time { + if entry.get().expiration() <= expiration_time { self.persist_subscribers.remove(entry.get().digest()); Some(entry.remove()) } else { diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index f6c9963f671ae..50302d9f05f4d 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -260,6 +260,7 @@ impl InnerBuilder { self.config.db_quota, self.config.batch_quota, signer, + Duration::from_secs(60).as_micros() as u64, )); self.batch_store = Some(batch_store.clone()); let batch_reader = Arc::new(BatchReaderImpl::new(batch_store.clone(), batch_requester)); diff --git a/consensus/src/quorum_store/tests/batch_store_test.rs b/consensus/src/quorum_store/tests/batch_store_test.rs index fed469c4df93f..196255f69e50a 100644 --- a/consensus/src/quorum_store/tests/batch_store_test.rs +++ b/consensus/src/quorum_store/tests/batch_store_test.rs @@ -36,6 +36,7 @@ pub fn batch_store_for_test(memory_quota: usize) -> Arc { 2001, // db quota 2001, // batch quota signers[0].clone(), + 0, )) } diff --git a/testsuite/forge-cli/src/suites/realistic_environment.rs b/testsuite/forge-cli/src/suites/realistic_environment.rs index ffcf4caf6db67..28a1e6e20e2b8 100644 --- a/testsuite/forge-cli/src/suites/realistic_environment.rs +++ b/testsuite/forge-cli/src/suites/realistic_environment.rs @@ -303,9 +303,9 @@ pub(crate) fn realistic_env_max_load_test( .add_system_metrics_threshold(SystemMetricsThreshold::new( // Check that we don't use more than 18 CPU cores for 15% of the time. MetricsThreshold::new(25.0, 15), - // Memory starts around 7GB, and grows around 1.4GB/hr in this test. + // Memory starts around 8GB, and grows around 1.4GB/hr in this test. // Check that we don't use more than final expected memory for more than 20% of the time. - MetricsThreshold::new_gb(7.0 + 1.4 * (duration_secs as f64 / 3600.0), 20), + MetricsThreshold::new_gb(8.0 + 1.4 * (duration_secs as f64 / 3600.0), 20), )) .add_no_restarts() .add_wait_for_catchup_s( @@ -316,7 +316,7 @@ pub(crate) fn realistic_env_max_load_test( .add_latency_threshold(4.5, LatencyType::P70) .add_chain_progress(StateProgressThreshold { max_non_epoch_no_progress_secs: 15.0, - max_epoch_no_progress_secs: 15.0, + max_epoch_no_progress_secs: 16.0, max_non_epoch_round_gap: 4, max_epoch_round_gap: 4, });