Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick] PR 7070 #7096

Merged
merged 1 commit into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 54 additions & 27 deletions consensus/src/quorum_store/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
expired_keys.push(digest);
} else {
batch_store
.update_cache(digest, value)
.insert_to_cache(digest, value)
.expect("Storage limit exceeded upon BatchReader construction");
}
}
Expand Down Expand Up @@ -219,28 +219,64 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
);
}

// Return an error if storage quota is exceeded.
fn update_cache(&self, digest: HashValue, mut value: PersistedValue) -> anyhow::Result<()> {
// Inserts a PersistedValue into the in-memory db_cache. If an entry with a higher
// value is already in the db_cache, Ok(false) is returned. If there was no entry
// Ok(true) is returned after the successful insertion. Finally, the method returns
// an error if storage quota is exceeded (if in-memory quota is exceeded,
// only the metadata is stored in the db-cache).
// Note: holds db_cache entry lock (due to DashMap), while accessing peer_quota
// DashMap. Hence, peer_quota reference should never be held while accessing the
// db_cache to avoid the deadlock (if needed, order is db_cache, then peer_quota).
pub(crate) fn insert_to_cache(
&self,
digest: HashValue,
mut value: PersistedValue,
) -> anyhow::Result<bool> {
let author = value.author;
if self
.peer_quota
.entry(author)
.or_insert(QuotaManager::new(self.db_quota, self.memory_quota))
.update_quota(value.num_bytes)?
== StorageMode::PersistedOnly
let expiration_round = value.expiration.round();

{
value.remove_payload();
}
// Acquire dashmap internal lock on the entry corresponding to the digest.
let cache_entry = self.db_cache.entry(digest);

if let Occupied(entry) = &cache_entry {
if entry.get().expiration.round() >= value.expiration.round() {
debug!(
"QS: already have the digest with higher expiration {}",
digest
);
return Ok(false);
}
};

let expiration_round = value.expiration.round();
if let Some(prev_value) = self.db_cache.insert(digest, value) {
self.free_quota(prev_value);
if self
.peer_quota
.entry(author)
.or_insert(QuotaManager::new(self.db_quota, self.memory_quota))
.update_quota(value.num_bytes)?
== StorageMode::PersistedOnly
{
value.remove_payload();
}

match cache_entry {
Occupied(entry) => {
let (k, prev_value) = entry.replace_entry(value);
debug_assert!(k == digest);
self.free_quota(prev_value);
},
Vacant(slot) => {
slot.insert(value);
},
}
}

// Add expiration for the inserted entry, no need to be atomic w. insertion.
self.expirations
.lock()
.unwrap()
.add_item(digest, expiration_round);
Ok(())
Ok(true)
}

pub(crate) fn save(&self, digest: HashValue, value: PersistedValue) -> anyhow::Result<bool> {
Expand All @@ -266,17 +302,7 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
Ok(false)
});

if let Some(entry) = self.db_cache.get(&digest) {
if entry.expiration.round() >= value.expiration.round() {
debug!(
"QS: already have the digest with higher expiration {}",
digest
);
return Ok(false);
}
}
self.update_cache(digest, value)?;
return Ok(true);
return self.insert_to_cache(digest, value);
}
}
bail!("Incorrect expiration {:?} with init gap {} in epoch {}, last committed round {} and max behind gap {} max beyond gap {}",
Expand All @@ -288,7 +314,8 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
self.batch_expiry_round_gap_beyond_latest_certified);
}

fn clear_expired_payload(&self, certified_time: LogicalTime) -> Vec<HashValue> {
// pub(crate) for testing
pub(crate) fn clear_expired_payload(&self, certified_time: LogicalTime) -> Vec<HashValue> {
assert_eq!(
certified_time.epoch(),
self.epoch(),
Expand Down
75 changes: 66 additions & 9 deletions consensus/src/quorum_store/tests/batch_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ use aptos_consensus_types::proof_of_store::LogicalTime;
use aptos_crypto::HashValue;
use aptos_temppath::TempPath;
use aptos_types::{account_address::AccountAddress, validator_verifier::random_validator_verifier};
use claims::assert_ok_eq;
use futures::executor::block_on;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::{sync::mpsc::channel, task::spawn_blocking};

#[tokio::test(flavor = "multi_thread")]
async fn test_extend_expiration_vs_save() {
let num_experiments = 2000;
fn batch_store_for_test_no_db(memory_quota: usize) -> Arc<BatchStore<MockQuorumStoreSender>> {
let tmp_dir = TempPath::new();
let db = Arc::new(QuorumStoreDB::new(&tmp_dir));
let (tx, _rx) = channel(10);
Expand All @@ -34,20 +33,78 @@ async fn test_extend_expiration_vs_save() {
);
let (signers, validator_verifier) = random_validator_verifier(4, None, false);

let batch_store = Arc::new(BatchStore::new(
Arc::new(BatchStore::new(
10, // epoch
10, // last committed round
db,
0,
0,
2100,
0, // grace period rounds
0, // memory_quota
1000, // db quota
0, // grace period rounds
memory_quota, // memory_quota
1000, // db quota
requester,
signers[0].clone(),
validator_verifier,
));
))
}

#[test]
fn test_insert_expire() {
let batch_store = batch_store_for_test_no_db(30);

let digest = HashValue::random();
assert_ok_eq!(
batch_store.insert_to_cache(
digest,
PersistedValue::new(
Some(Vec::new()),
LogicalTime::new(10, 15), // Expiration
AccountAddress::random(),
10,
),
),
true
);

assert_ok_eq!(
batch_store.insert_to_cache(
digest,
PersistedValue::new(
Some(Vec::new()),
LogicalTime::new(10, 30), // Expiration
AccountAddress::random(),
10,
),
),
true
);
assert_ok_eq!(
batch_store.insert_to_cache(
digest,
PersistedValue::new(
Some(Vec::new()),
LogicalTime::new(10, 25), // Expiration
AccountAddress::random(),
10,
),
),
false
);
let expired = batch_store.clear_expired_payload(LogicalTime::new(10, 27));
assert!(expired.is_empty());
let expired = batch_store.clear_expired_payload(LogicalTime::new(10, 29));
assert!(expired.is_empty());
assert_eq!(
batch_store.clear_expired_payload(LogicalTime::new(10, 30)),
vec![digest]
);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_extend_expiration_vs_save() {
let num_experiments = 2000;
let batch_store = batch_store_for_test_no_db(0);

let batch_store_clone1 = batch_store.clone();
let batch_store_clone2 = batch_store.clone();
Expand Down Expand Up @@ -156,5 +213,5 @@ async fn test_extend_expiration_vs_save() {
// TODO: last certified round.
// TODO: check correct digests are returned.
// TODO: check grace period.
// TODO: check quota.
// TODO: check quota (cache vs persisted).
// TODO: check the channels.