diff --git a/consensus/src/quorum_store/batch_store.rs b/consensus/src/quorum_store/batch_store.rs index 74720737dd278..523744c76749f 100644 --- a/consensus/src/quorum_store/batch_store.rs +++ b/consensus/src/quorum_store/batch_store.rs @@ -123,6 +123,7 @@ pub struct BatchStore { impl BatchStore { pub(crate) fn new( epoch: u64, + is_new_epoch: bool, last_certified_time: u64, db: Arc, memory_quota: usize, @@ -146,18 +147,73 @@ impl BatchStore { persist_subscribers: DashMap::new(), expiration_buffer_usecs, }; - let db_content = db_clone - .get_all_batches() - .expect("failed to read data from db"); + + if is_new_epoch { + tokio::task::spawn_blocking(move || { + Self::gc_previous_epoch_batches_from_db(db_clone, epoch); + }); + } else { + Self::populate_cache_and_gc_expired_batches( + db_clone, + epoch, + last_certified_time, + expiration_buffer_usecs, + &batch_store, + ); + } + + batch_store + } + + fn gc_previous_epoch_batches_from_db(db: Arc, current_epoch: u64) { + let db_content = db.get_all_batches().expect("failed to read data from db"); + info!( + epoch = current_epoch, + "QS: Read batches from storage. Len: {}", + db_content.len(), + ); + let mut expired_keys = Vec::new(); - trace!( - "QS: Batchreader {} {} {}", + for (digest, value) in db_content { + let epoch = value.epoch(); + + trace!( + "QS: Batchreader recovery content epoch {:?}, digest {}", + epoch, + digest + ); + + if epoch < current_epoch { + expired_keys.push(digest); + } + } + + info!( + "QS: Batch store bootstrap expired keys len {}", + expired_keys.len() + ); + db.delete_batches(expired_keys) + .expect("Deletion of expired keys should not fail"); + } + + fn populate_cache_and_gc_expired_batches( + db: Arc, + current_epoch: u64, + last_certified_time: u64, + expiration_buffer_usecs: u64, + batch_store: &BatchStore, + ) { + let db_content = db.get_all_batches().expect("failed to read data from db"); + info!( + epoch = current_epoch, + "QS: Read batches from storage. Len: {}, Last Cerified Time: {}", db_content.len(), - epoch, last_certified_time ); + + let mut expired_keys = Vec::new(); for (digest, value) in db_content { - let expiration = value.expiration(); + let expiration = value.expiration().saturating_sub(expiration_buffer_usecs); trace!( "QS: Batchreader recovery content exp {:?}, digest {}", @@ -173,15 +229,15 @@ impl BatchStore { .expect("Storage limit exceeded upon BatchReader construction"); } } - trace!( - "QS: Batchreader recovery expired keys len {}", + + info!( + "QS: Batch store bootstrap expired keys len {}", expired_keys.len() ); - db_clone - .delete_batches(expired_keys) - .expect("Deletion of expired keys should not fail"); - - batch_store + tokio::task::spawn_blocking(move || { + db.delete_batches(expired_keys) + .expect("Deletion of expired keys should not fail"); + }); } fn epoch(&self) -> u64 { diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index f56fd94397129..069fa3d241eda 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -5,6 +5,7 @@ use super::quorum_store_db::QuorumStoreStorage; use crate::{ consensus_observer::publisher::consensus_publisher::ConsensusPublisher, error::error_kind, + monitor, network::{IncomingBatchRetrievalRequest, NetworkSender}, network_interface::ConsensusMsg, payload_manager::{DirectMempoolPayloadManager, QuorumStorePayloadManager, TPayloadManager}, @@ -235,6 +236,7 @@ impl InnerBuilder { .get_latest_ledger_info() .expect("could not get latest ledger info"); let last_committed_timestamp = latest_ledger_info_with_sigs.commit_info().timestamp_usecs(); + let is_new_epoch = latest_ledger_info_with_sigs.ledger_info().ends_epoch(); let batch_requester = BatchRequester::new( self.epoch, @@ -248,6 +250,7 @@ impl InnerBuilder { ); let batch_store = Arc::new(BatchStore::new( self.epoch, + is_new_epoch, last_committed_timestamp, self.quorum_store_storage.clone(), self.config.memory_quota, @@ -434,7 +437,7 @@ impl InnerBuilder { Arc, Option>, ) { - let batch_reader = self.create_batch_store(); + let batch_reader = monitor!("qs_create_batch_store", self.create_batch_store()); ( Arc::from(QuorumStorePayloadManager::new( diff --git a/consensus/src/quorum_store/tests/batch_proof_queue_test.rs b/consensus/src/quorum_store/tests/batch_proof_queue_test.rs index 96ab5414ab120..49746df432b75 100644 --- a/consensus/src/quorum_store/tests/batch_proof_queue_test.rs +++ b/consensus/src/quorum_store/tests/batch_proof_queue_test.rs @@ -58,8 +58,8 @@ fn proof_of_store_with_size( ) } -#[test] -fn test_proof_queue_sorting() { +#[tokio::test] +async fn test_proof_queue_sorting() { let my_peer_id = PeerId::random(); let batch_store = batch_store_for_test(5 * 1024 * 1024); let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1); @@ -145,8 +145,8 @@ fn test_proof_queue_sorting() { assert_eq!(count_author_1, 2); } -#[test] -fn test_proof_calculate_remaining_txns_and_proofs() { +#[tokio::test] +async fn test_proof_calculate_remaining_txns_and_proofs() { let my_peer_id = PeerId::random(); let batch_store = batch_store_for_test(5 * 1024 * 1024); let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1); @@ -405,8 +405,8 @@ fn test_proof_calculate_remaining_txns_and_proofs() { assert_eq!(proof_queue.batch_summaries_len(), 0); } -#[test] -fn test_proof_pull_proofs_with_duplicates() { +#[tokio::test] +async fn test_proof_pull_proofs_with_duplicates() { let my_peer_id = PeerId::random(); let batch_store = batch_store_for_test(5 * 1024 * 1024); let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1); @@ -656,8 +656,8 @@ fn test_proof_pull_proofs_with_duplicates() { assert!(proof_queue.is_empty()); } -#[test] -fn test_proof_queue_soft_limit() { +#[tokio::test] +async fn test_proof_queue_soft_limit() { let my_peer_id = PeerId::random(); let batch_store = batch_store_for_test(5 * 1024 * 1024); let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1); @@ -698,8 +698,8 @@ fn test_proof_queue_soft_limit() { assert_eq!(num_unique_txns, 20); } -#[test] -fn test_proof_queue_insert_after_commit() { +#[tokio::test] +async fn test_proof_queue_insert_after_commit() { let my_peer_id = PeerId::random(); let batch_store = batch_store_for_test(5 * 1024); let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1); @@ -730,8 +730,8 @@ fn test_proof_queue_insert_after_commit() { assert!(proof_queue.is_empty()); } -#[test] -fn test_proof_queue_pull_full_utilization() { +#[tokio::test] +async fn test_proof_queue_pull_full_utilization() { let my_peer_id = PeerId::random(); let batch_store = batch_store_for_test(5 * 1024); let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1); diff --git a/consensus/src/quorum_store/tests/batch_store_test.rs b/consensus/src/quorum_store/tests/batch_store_test.rs index 196255f69e50a..2992700b05dd2 100644 --- a/consensus/src/quorum_store/tests/batch_store_test.rs +++ b/consensus/src/quorum_store/tests/batch_store_test.rs @@ -30,6 +30,7 @@ pub fn batch_store_for_test(memory_quota: usize) -> Arc { Arc::new(BatchStore::new( 10, // epoch + false, 10, // last committed round db, memory_quota, // memory_quota @@ -61,8 +62,8 @@ fn request_for_test( ) } -#[test] -fn test_insert_expire() { +#[tokio::test] +async fn test_insert_expire() { let batch_store = batch_store_for_test(30); let digest = HashValue::random(); @@ -226,8 +227,8 @@ fn test_quota_manager() { assert_ok_eq!(qm.update_quota(2), StorageMode::MemoryAndPersisted); } -#[test] -fn test_get_local_batch() { +#[tokio::test] +async fn test_get_local_batch() { let store = batch_store_for_test(30); let digest_1 = HashValue::random();