Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[qs] batch store bootstrap perf improvements #15491

Merged
merged 3 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 70 additions & 14 deletions consensus/src/quorum_store/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ pub struct BatchStore {
impl BatchStore {
pub(crate) fn new(
epoch: u64,
is_new_epoch: bool,
last_certified_time: u64,
db: Arc<dyn QuorumStoreStorage>,
memory_quota: usize,
Expand All @@ -146,18 +147,73 @@ impl BatchStore {
persist_subscribers: DashMap::new(),
expiration_buffer_usecs,
};
let db_content = db_clone
.get_all_batches()
.expect("failed to read data from db");

if is_new_epoch {
tokio::task::spawn_blocking(move || {
Self::gc_previous_epoch_batches_from_db(db_clone, epoch);
});
} else {
Self::populate_cache_and_gc_expired_batches(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bchocho @zekun000 I split the QS part of the PR out and addressed your comments here. I renamed the method to be clear.

db_clone,
epoch,
last_certified_time,
expiration_buffer_usecs,
&batch_store,
);
}

batch_store
}

fn gc_previous_epoch_batches_from_db(db: Arc<dyn QuorumStoreStorage>, current_epoch: u64) {
let db_content = db.get_all_batches().expect("failed to read data from db");
info!(
epoch = current_epoch,
"QS: Read batches from storage. Len: {}",
db_content.len(),
);

let mut expired_keys = Vec::new();
trace!(
"QS: Batchreader {} {} {}",
for (digest, value) in db_content {
let epoch = value.epoch();

trace!(
"QS: Batchreader recovery content epoch {:?}, digest {}",
epoch,
digest
);

if epoch < current_epoch {
expired_keys.push(digest);
}
}

info!(
"QS: Batch store bootstrap expired keys len {}",
expired_keys.len()
);
db.delete_batches(expired_keys)
.expect("Deletion of expired keys should not fail");
}

fn populate_cache_and_gc_expired_batches(
db: Arc<dyn QuorumStoreStorage>,
current_epoch: u64,
last_certified_time: u64,
expiration_buffer_usecs: u64,
batch_store: &BatchStore,
) {
let db_content = db.get_all_batches().expect("failed to read data from db");
info!(
epoch = current_epoch,
"QS: Read batches from storage. Len: {}, Last Cerified Time: {}",
db_content.len(),
epoch,
last_certified_time
);

let mut expired_keys = Vec::new();
for (digest, value) in db_content {
let expiration = value.expiration();
let expiration = value.expiration().saturating_sub(expiration_buffer_usecs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch


trace!(
"QS: Batchreader recovery content exp {:?}, digest {}",
Expand All @@ -173,15 +229,15 @@ impl BatchStore {
.expect("Storage limit exceeded upon BatchReader construction");
}
}
trace!(
"QS: Batchreader recovery expired keys len {}",

info!(
"QS: Batch store bootstrap expired keys len {}",
expired_keys.len()
);
db_clone
.delete_batches(expired_keys)
.expect("Deletion of expired keys should not fail");

batch_store
tokio::task::spawn_blocking(move || {
db.delete_batches(expired_keys)
.expect("Deletion of expired keys should not fail");
});
}

fn epoch(&self) -> u64 {
Expand Down
5 changes: 4 additions & 1 deletion consensus/src/quorum_store/quorum_store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::quorum_store_db::QuorumStoreStorage;
use crate::{
consensus_observer::publisher::consensus_publisher::ConsensusPublisher,
error::error_kind,
monitor,
network::{IncomingBatchRetrievalRequest, NetworkSender},
network_interface::ConsensusMsg,
payload_manager::{DirectMempoolPayloadManager, QuorumStorePayloadManager, TPayloadManager},
Expand Down Expand Up @@ -235,6 +236,7 @@ impl InnerBuilder {
.get_latest_ledger_info()
.expect("could not get latest ledger info");
let last_committed_timestamp = latest_ledger_info_with_sigs.commit_info().timestamp_usecs();
let is_new_epoch = latest_ledger_info_with_sigs.ledger_info().ends_epoch();

let batch_requester = BatchRequester::new(
self.epoch,
Expand All @@ -248,6 +250,7 @@ impl InnerBuilder {
);
let batch_store = Arc::new(BatchStore::new(
self.epoch,
is_new_epoch,
last_committed_timestamp,
self.quorum_store_storage.clone(),
self.config.memory_quota,
Expand Down Expand Up @@ -434,7 +437,7 @@ impl InnerBuilder {
Arc<dyn TPayloadManager>,
Option<aptos_channel::Sender<AccountAddress, (Author, VerifiedEvent)>>,
) {
let batch_reader = self.create_batch_store();
let batch_reader = monitor!("qs_create_batch_store", self.create_batch_store());

(
Arc::from(QuorumStorePayloadManager::new(
Expand Down
24 changes: 12 additions & 12 deletions consensus/src/quorum_store/tests/batch_proof_queue_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ fn proof_of_store_with_size(
)
}

#[test]
fn test_proof_queue_sorting() {
#[tokio::test]
async fn test_proof_queue_sorting() {
let my_peer_id = PeerId::random();
let batch_store = batch_store_for_test(5 * 1024 * 1024);
let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1);
Expand Down Expand Up @@ -145,8 +145,8 @@ fn test_proof_queue_sorting() {
assert_eq!(count_author_1, 2);
}

#[test]
fn test_proof_calculate_remaining_txns_and_proofs() {
#[tokio::test]
async fn test_proof_calculate_remaining_txns_and_proofs() {
let my_peer_id = PeerId::random();
let batch_store = batch_store_for_test(5 * 1024 * 1024);
let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1);
Expand Down Expand Up @@ -405,8 +405,8 @@ fn test_proof_calculate_remaining_txns_and_proofs() {
assert_eq!(proof_queue.batch_summaries_len(), 0);
}

#[test]
fn test_proof_pull_proofs_with_duplicates() {
#[tokio::test]
async fn test_proof_pull_proofs_with_duplicates() {
let my_peer_id = PeerId::random();
let batch_store = batch_store_for_test(5 * 1024 * 1024);
let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1);
Expand Down Expand Up @@ -656,8 +656,8 @@ fn test_proof_pull_proofs_with_duplicates() {
assert!(proof_queue.is_empty());
}

#[test]
fn test_proof_queue_soft_limit() {
#[tokio::test]
async fn test_proof_queue_soft_limit() {
let my_peer_id = PeerId::random();
let batch_store = batch_store_for_test(5 * 1024 * 1024);
let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1);
Expand Down Expand Up @@ -698,8 +698,8 @@ fn test_proof_queue_soft_limit() {
assert_eq!(num_unique_txns, 20);
}

#[test]
fn test_proof_queue_insert_after_commit() {
#[tokio::test]
async fn test_proof_queue_insert_after_commit() {
let my_peer_id = PeerId::random();
let batch_store = batch_store_for_test(5 * 1024);
let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1);
Expand Down Expand Up @@ -730,8 +730,8 @@ fn test_proof_queue_insert_after_commit() {
assert!(proof_queue.is_empty());
}

#[test]
fn test_proof_queue_pull_full_utilization() {
#[tokio::test]
async fn test_proof_queue_pull_full_utilization() {
let my_peer_id = PeerId::random();
let batch_store = batch_store_for_test(5 * 1024);
let mut proof_queue = BatchProofQueue::new(my_peer_id, batch_store, 1);
Expand Down
9 changes: 5 additions & 4 deletions consensus/src/quorum_store/tests/batch_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub fn batch_store_for_test(memory_quota: usize) -> Arc<BatchStore> {

Arc::new(BatchStore::new(
10, // epoch
false,
10, // last committed round
db,
memory_quota, // memory_quota
Expand Down Expand Up @@ -61,8 +62,8 @@ fn request_for_test(
)
}

#[test]
fn test_insert_expire() {
#[tokio::test]
async fn test_insert_expire() {
let batch_store = batch_store_for_test(30);

let digest = HashValue::random();
Expand Down Expand Up @@ -226,8 +227,8 @@ fn test_quota_manager() {
assert_ok_eq!(qm.update_quota(2), StorageMode::MemoryAndPersisted);
}

#[test]
fn test_get_local_batch() {
#[tokio::test]
async fn test_get_local_batch() {
let store = batch_store_for_test(30);

let digest_1 = HashValue::random();
Expand Down
Loading