Skip to content

Commit

Permalink
[qs] batch store bootstrap perf improvements (#15491)
Browse files Browse the repository at this point in the history
* [qs] Use expiration buffer to cleanup during bootstrap
* [qs] async gc old epoch batches from batch store
* [qs] monitor! create batch store
  • Loading branch information
ibalajiarun authored Dec 4, 2024
1 parent 96612fd commit 63f0df8
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 31 deletions.
84 changes: 70 additions & 14 deletions consensus/src/quorum_store/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub struct BatchStore {
impl BatchStore {
pub(crate) fn new(
epoch: u64,
is_new_epoch: bool,
last_certified_time: u64,
db: Arc<dyn QuorumStoreStorage>,
memory_quota: usize,
Expand All @@ -146,18 +147,73 @@ impl BatchStore {
persist_subscribers: DashMap::new(),
expiration_buffer_usecs,
};
let db_content = db_clone
.get_all_batches()
.expect("failed to read data from db");

if is_new_epoch {
tokio::task::spawn_blocking(move || {
Self::gc_previous_epoch_batches_from_db(db_clone, epoch);
});
} else {
Self::populate_cache_and_gc_expired_batches(
db_clone,
epoch,
last_certified_time,
expiration_buffer_usecs,
&batch_store,
);
}

batch_store
}

fn gc_previous_epoch_batches_from_db(db: Arc<dyn QuorumStoreStorage>, current_epoch: u64) {
let db_content = db.get_all_batches().expect("failed to read data from db");
info!(
epoch = current_epoch,
"QS: Read batches from storage. Len: {}",
db_content.len(),
);

let mut expired_keys = Vec::new();
trace!(
"QS: Batchreader {} {} {}",
for (digest, value) in db_content {
let epoch = value.epoch();

trace!(
"QS: Batchreader recovery content epoch {:?}, digest {}",
epoch,
digest
);

if epoch < current_epoch {
expired_keys.push(digest);
}
}

info!(
"QS: Batch store bootstrap expired keys len {}",
expired_keys.len()
);
db.delete_batches(expired_keys)
.expect("Deletion of expired keys should not fail");
}

fn populate_cache_and_gc_expired_batches(
db: Arc<dyn QuorumStoreStorage>,
current_epoch: u64,
last_certified_time: u64,
expiration_buffer_usecs: u64,
batch_store: &BatchStore,
) {
let db_content = db.get_all_batches().expect("failed to read data from db");
info!(
epoch = current_epoch,
"QS: Read batches from storage. Len: {}, Last Cerified Time: {}",
db_content.len(),
epoch,
last_certified_time
);

let mut expired_keys = Vec::new();
for (digest, value) in db_content {
let expiration = value.expiration();
let expiration = value.expiration().saturating_sub(expiration_buffer_usecs);

trace!(
"QS: Batchreader recovery content exp {:?}, digest {}",
Expand All @@ -173,15 +229,15 @@ impl BatchStore {
.expect("Storage limit exceeded upon BatchReader construction");
}
}
trace!(
"QS: Batchreader recovery expired keys len {}",

info!(
"QS: Batch store bootstrap expired keys len {}",
expired_keys.len()
);
db_clone
.delete_batches(expired_keys)
.expect("Deletion of expired keys should not fail");

batch_store
tokio::task::spawn_blocking(move || {
db.delete_batches(expired_keys)
.expect("Deletion of expired keys should not fail");
});
}

fn epoch(&self) -> u64 {
Expand Down
5 changes: 4 additions & 1 deletion consensus/src/quorum_store/quorum_store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::quorum_store_db::QuorumStoreStorage;
use crate::{
consensus_observer::publisher::consensus_publisher::ConsensusPublisher,
error::error_kind,
monitor,
network::{IncomingBatchRetrievalRequest, NetworkSender},
network_interface::ConsensusMsg,
payload_manager::{DirectMempoolPayloadManager, QuorumStorePayloadManager, TPayloadManager},
Expand Down Expand Up @@ -235,6 +236,7 @@ impl InnerBuilder {
.get_latest_ledger_info()
.expect("could not get latest ledger info");
let last_committed_timestamp = latest_ledger_info_with_sigs.commit_info().timestamp_usecs();
let is_new_epoch = latest_ledger_info_with_sigs.ledger_info().ends_epoch();

let batch_requester = BatchRequester::new(
self.epoch,
Expand All @@ -248,6 +250,7 @@ impl InnerBuilder {
);
let batch_store = Arc::new(BatchStore::new(
self.epoch,
is_new_epoch,
last_committed_timestamp,
self.quorum_store_storage.clone(),
self.config.memory_quota,
Expand Down Expand Up @@ -434,7 +437,7 @@ impl InnerBuilder {
Arc<dyn TPayloadManager>,
Option<aptos_channel::Sender<AccountAddress, (Author, VerifiedEvent)>>,
) {
let batch_reader = self.create_batch_store();
let batch_reader = monitor!("qs_create_batch_store", self.create_batch_store());

(
Arc::from(QuorumStorePayloadManager::new(
Expand Down
24 changes: 12 additions & 12 deletions consensus/src/quorum_store/tests/batch_proof_queue_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ fn proof_of_store_with_size(
)
}

#[test]
fn test_proof_queue_sorting() {
#[tokio::test]
async fn test_proof_queue_sorting() {
let my_peer_id = PeerId::random();
let batch_store = batch_store_for_test(5 * 1024 * 1024);
let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1);
Expand Down Expand Up @@ -145,8 +145,8 @@ fn test_proof_queue_sorting() {
assert_eq!(count_author_1, 2);
}

#[test]
fn test_proof_calculate_remaining_txns_and_proofs() {
#[tokio::test]
async fn test_proof_calculate_remaining_txns_and_proofs() {
let my_peer_id = PeerId::random();
let batch_store = batch_store_for_test(5 * 1024 * 1024);
let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1);
Expand Down Expand Up @@ -405,8 +405,8 @@ fn test_proof_calculate_remaining_txns_and_proofs() {
assert_eq!(proof_queue.batch_summaries_len(), 0);
}

#[test]
fn test_proof_pull_proofs_with_duplicates() {
#[tokio::test]
async fn test_proof_pull_proofs_with_duplicates() {
let my_peer_id = PeerId::random();
let batch_store = batch_store_for_test(5 * 1024 * 1024);
let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1);
Expand Down Expand Up @@ -656,8 +656,8 @@ fn test_proof_pull_proofs_with_duplicates() {
assert!(proof_queue.is_empty());
}

#[test]
fn test_proof_queue_soft_limit() {
#[tokio::test]
async fn test_proof_queue_soft_limit() {
let my_peer_id = PeerId::random();
let batch_store = batch_store_for_test(5 * 1024 * 1024);
let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1);
Expand Down Expand Up @@ -698,8 +698,8 @@ fn test_proof_queue_soft_limit() {
assert_eq!(num_unique_txns, 20);
}

#[test]
fn test_proof_queue_insert_after_commit() {
#[tokio::test]
async fn test_proof_queue_insert_after_commit() {
let my_peer_id = PeerId::random();
let batch_store = batch_store_for_test(5 * 1024);
let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1);
Expand Down Expand Up @@ -730,8 +730,8 @@ fn test_proof_queue_insert_after_commit() {
assert!(proof_queue.is_empty());
}

#[test]
fn test_proof_queue_pull_full_utilization() {
#[tokio::test]
async fn test_proof_queue_pull_full_utilization() {
let my_peer_id = PeerId::random();
let batch_store = batch_store_for_test(5 * 1024);
let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1);
Expand Down
9 changes: 5 additions & 4 deletions consensus/src/quorum_store/tests/batch_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub fn batch_store_for_test(memory_quota: usize) -> Arc<BatchStore> {

Arc::new(BatchStore::new(
10, // epoch
false,
10, // last committed round
db,
memory_quota, // memory_quota
Expand Down Expand Up @@ -61,8 +62,8 @@ fn request_for_test(
)
}

#[test]
fn test_insert_expire() {
#[tokio::test]
async fn test_insert_expire() {
let batch_store = batch_store_for_test(30);

let digest = HashValue::random();
Expand Down Expand Up @@ -226,8 +227,8 @@ fn test_quota_manager() {
assert_ok_eq!(qm.update_quota(2), StorageMode::MemoryAndPersisted);
}

#[test]
fn test_get_local_batch() {
#[tokio::test]
async fn test_get_local_batch() {
let store = batch_store_for_test(30);

let digest_1 = HashValue::random();
Expand Down

0 comments on commit 63f0df8

Please sign in to comment.