From 9060deec329885d6059d8c0bad17e5b2e9cb848e Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Thu, 21 Nov 2024 16:21:35 -0800 Subject: [PATCH] [qs] grace period before GC committed batches --- consensus/src/quorum_store/batch_store.rs | 11 +++++++++-- consensus/src/quorum_store/quorum_store_builder.rs | 1 + consensus/src/quorum_store/tests/batch_store_test.rs | 1 + 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/consensus/src/quorum_store/batch_store.rs b/consensus/src/quorum_store/batch_store.rs index f8d32b23137ff..74720737dd278 100644 --- a/consensus/src/quorum_store/batch_store.rs +++ b/consensus/src/quorum_store/batch_store.rs @@ -117,6 +117,7 @@ pub struct BatchStore { batch_quota: usize, validator_signer: ValidatorSigner, persist_subscribers: DashMap>>, + expiration_buffer_usecs: u64, } impl BatchStore { @@ -128,6 +129,7 @@ impl BatchStore { db_quota: usize, batch_quota: usize, validator_signer: ValidatorSigner, + expiration_buffer_usecs: u64, ) -> Self { let db_clone = db.clone(); let batch_store = Self { @@ -142,6 +144,7 @@ impl BatchStore { batch_quota, validator_signer, persist_subscribers: DashMap::new(), + expiration_buffer_usecs, }; let db_content = db_clone .get_all_batches() @@ -283,7 +286,11 @@ impl BatchStore { // pub(crate) for testing #[allow(clippy::unwrap_used)] pub(crate) fn clear_expired_payload(&self, certified_time: u64) -> Vec { - let expired_digests = self.expirations.lock().unwrap().expire(certified_time); + // To help slow nodes catch up via execution without going to state sync we keep the blocks for 60 extra seconds + // after the expiration time. This will help remote peers fetch batches that just expired but are within their + // execution window. + let expiration_time = certified_time.saturating_sub(self.expiration_buffer_usecs); + let expired_digests = self.expirations.lock().unwrap().expire(expiration_time); let mut ret = Vec::new(); for h in expired_digests { let removed_value = match self.db_cache.entry(h) { @@ -291,7 +298,7 @@ impl BatchStore { // We need to check up-to-date expiration again because receiving the same // digest with a higher expiration would update the persisted value and // effectively extend the expiration. - if entry.get().expiration() <= certified_time { + if entry.get().expiration() <= expiration_time { self.persist_subscribers.remove(entry.get().digest()); Some(entry.remove()) } else { diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index f6c9963f671ae..50302d9f05f4d 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -260,6 +260,7 @@ impl InnerBuilder { self.config.db_quota, self.config.batch_quota, signer, + Duration::from_secs(60).as_micros() as u64, )); self.batch_store = Some(batch_store.clone()); let batch_reader = Arc::new(BatchReaderImpl::new(batch_store.clone(), batch_requester)); diff --git a/consensus/src/quorum_store/tests/batch_store_test.rs b/consensus/src/quorum_store/tests/batch_store_test.rs index fed469c4df93f..196255f69e50a 100644 --- a/consensus/src/quorum_store/tests/batch_store_test.rs +++ b/consensus/src/quorum_store/tests/batch_store_test.rs @@ -36,6 +36,7 @@ pub fn batch_store_for_test(memory_quota: usize) -> Arc { 2001, // db quota 2001, // batch quota signers[0].clone(), + 0, )) }