Skip to content

Commit

Permalink
[qs] use Deref to access batch info
Browse files Browse the repository at this point in the history
  • Loading branch information
zekun000 committed Mar 16, 2023
1 parent cf2c5e4 commit 510ffa0
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 131 deletions.
4 changes: 2 additions & 2 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl Payload {
Payload::InQuorumStore(proof_with_status) => proof_with_status
.proofs
.iter()
.map(|proof| proof.info().num_txns as usize)
.map(|proof| proof.num_txns() as usize)
.sum(),
}
}
Expand All @@ -125,7 +125,7 @@ impl Payload {
Payload::InQuorumStore(proof_with_status) => proof_with_status
.proofs
.iter()
.map(|proof| proof.info().num_bytes as usize)
.map(|proof| proof.num_bytes() as usize)
.sum(),
}
}
Expand Down
82 changes: 50 additions & 32 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering,
fmt::{Display, Formatter},
hash::Hash,
ops::Deref,
};

#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
Expand Down Expand Up @@ -87,12 +89,12 @@ impl Display for BatchId {
Clone, Debug, Deserialize, Serialize, CryptoHasher, BCSCryptoHash, PartialEq, Eq, Hash,
)]
pub struct BatchInfo {
pub author: PeerId,
pub batch_id: BatchId,
pub expiration: LogicalTime,
pub digest: HashValue,
pub num_txns: u64,
pub num_bytes: u64,
author: PeerId,
batch_id: BatchId,
expiration: LogicalTime,
digest: HashValue,
num_txns: u64,
num_bytes: u64,
}

impl BatchInfo {
Expand All @@ -117,12 +119,36 @@ impl BatchInfo {
pub fn epoch(&self) -> u64 {
self.expiration.epoch
}

pub fn author(&self) -> PeerId {
self.author
}

pub fn batch_id(&self) -> BatchId {
self.batch_id
}

pub fn expiration(&self) -> LogicalTime {
self.expiration
}

pub fn digest(&self) -> &HashValue {
&self.digest
}

pub fn num_txns(&self) -> u64 {
self.num_txns
}

pub fn num_bytes(&self) -> u64 {
self.num_bytes
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SignedBatchInfo {
signer: PeerId,
info: BatchInfo,
signer: PeerId,
signature: bls12381::Signature,
}

Expand All @@ -134,8 +160,8 @@ impl SignedBatchInfo {
let signature = validator_signer.sign(&batch_info)?;

Ok(Self {
signer: validator_signer.author(),
info: batch_info,
signer: validator_signer.author(),
signature,
})
}
Expand All @@ -144,10 +170,6 @@ impl SignedBatchInfo {
self.signer
}

pub fn epoch(&self) -> u64 {
self.info.epoch()
}

pub fn verify(&self, sender: PeerId, validator: &ValidatorVerifier) -> anyhow::Result<()> {
if sender == self.signer {
Ok(validator.verify(self.signer, &self.info, &self.signature)?)
Expand All @@ -156,16 +178,20 @@ impl SignedBatchInfo {
}
}

pub fn info(&self) -> &BatchInfo {
&self.info
}

pub fn signature(self) -> bls12381::Signature {
self.signature
}

pub fn digest(&self) -> HashValue {
self.info.digest
pub fn batch_info(&self) -> &BatchInfo {
&self.info
}
}

impl Deref for SignedBatchInfo {
type Target = BatchInfo;

fn deref(&self) -> &Self::Target {
&self.info
}
}

Expand All @@ -191,18 +217,6 @@ impl ProofOfStore {
}
}

pub fn info(&self) -> &BatchInfo {
&self.info
}

pub fn digest(&self) -> &HashValue {
&self.info.digest
}

pub fn expiration(&self) -> LogicalTime {
self.info.expiration
}

pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
validator
.verify_multi_signatures(&self.info, &self.multi_signature)
Expand All @@ -216,8 +230,12 @@ impl ProofOfStore {
ret.shuffle(&mut thread_rng());
ret
}
}

pub fn epoch(&self) -> u64 {
self.info.expiration.epoch
impl Deref for ProofOfStore {
type Target = BatchInfo;

fn deref(&self) -> &Self::Target {
&self.info
}
}
6 changes: 3 additions & 3 deletions consensus/src/quorum_store/batch_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct BatchCoordinator {
my_peer_id: PeerId,
network_sender: NetworkSender,
batch_store: Arc<BatchStore<NetworkSender>>,
max_batch_bytes: usize,
max_batch_bytes: u64,
}

impl BatchCoordinator {
Expand All @@ -34,7 +34,7 @@ impl BatchCoordinator {
my_peer_id: PeerId,
network_sender: NetworkSender,
batch_store: Arc<BatchStore<NetworkSender>>,
max_batch_bytes: usize,
max_batch_bytes: u64,
) -> Self {
Self {
epoch,
Expand Down Expand Up @@ -83,7 +83,7 @@ impl BatchCoordinator {
let network_sender = self.network_sender.clone();
let my_peer_id = self.my_peer_id;
tokio::spawn(async move {
let peer_id = persist_request.value.info.author;
let peer_id = persist_request.value.author();
if let Some(signed_batch_info) = batch_store.persist(persist_request) {
if my_peer_id != peer_id {
counters::RECEIVED_REMOTE_BATCHES_COUNT.inc();
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/quorum_store/batch_requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<T: QuorumStoreSender + 'static> BatchRequester<T> {
match response {
Ok(batch) => {
counters::RECEIVED_BATCH_RESPONSE_COUNT.inc();
let digest = batch.digest();
let digest = *batch.digest();
let payload = batch.into_transactions();
request_state.serve_request(digest, Some(payload));
return;
Expand Down
59 changes: 21 additions & 38 deletions consensus/src/quorum_store/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
use crate::{
network::QuorumStoreSender,
quorum_store::{
batch_requester::BatchRequester, counters, quorum_store_db::QuorumStoreStorage,
types::PersistedValue, utils::RoundExpirations,
batch_requester::BatchRequester,
counters,
quorum_store_db::QuorumStoreStorage,
types::{PersistedValue, StorageMode},
utils::RoundExpirations,
},
};
use anyhow::bail;
Expand Down Expand Up @@ -38,13 +41,6 @@ pub struct PersistRequest {
pub digest: HashValue,
pub value: PersistedValue,
}

#[derive(PartialEq)]
enum StorageMode {
PersistedOnly,
MemoryAndPersisted,
}

struct QuotaManager {
memory_balance: usize,
db_balance: usize,
Expand Down Expand Up @@ -100,13 +96,6 @@ impl QuotaManager {
}
}

fn payload_storage_mode(persisted_value: &PersistedValue) -> StorageMode {
match persisted_value.maybe_payload {
Some(_) => StorageMode::MemoryAndPersisted,
None => StorageMode::PersistedOnly,
}
}

/// Provides in memory representation of stored batches (strong cache), and allows
/// efficient concurrent readers.
pub struct BatchStore<T> {
Expand Down Expand Up @@ -174,7 +163,7 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
last_certified_round
);
for (digest, value) in db_content {
let expiration = value.info.expiration;
let expiration = value.expiration();

trace!(
"QS: Batchreader recovery content exp {:?}, digest {}",
Expand Down Expand Up @@ -209,11 +198,11 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
fn free_quota(&self, persisted_value: PersistedValue) {
let mut quota_manager = self
.peer_quota
.get_mut(&persisted_value.info.author)
.get_mut(&persisted_value.author())
.expect("No QuotaManager for batch author");
quota_manager.free_quota(
persisted_value.info.num_bytes as usize,
payload_storage_mode(&persisted_value),
persisted_value.num_bytes() as usize,
persisted_value.payload_storage_mode(),
);
}

Expand All @@ -230,15 +219,15 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
digest: HashValue,
mut value: PersistedValue,
) -> anyhow::Result<bool> {
let author = value.info.author;
let expiration_round = value.info.expiration.round();
let author = value.author();
let expiration_round = value.expiration().round();

{
// Acquire dashmap internal lock on the entry corresponding to the digest.
let cache_entry = self.db_cache.entry(digest);

if let Occupied(entry) = &cache_entry {
if entry.get().info.expiration.round() >= value.info.expiration.round() {
if entry.get().expiration().round() >= value.expiration().round() {
debug!(
"QS: already have the digest with higher expiration {}",
digest
Expand All @@ -255,7 +244,7 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
self.memory_quota,
self.batch_quota,
))
.update_quota(value.info.num_bytes as usize)?
.update_quota(value.num_bytes() as usize)?
== StorageMode::PersistedOnly
{
value.remove_payload();
Expand All @@ -282,7 +271,7 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
}

pub(crate) fn save(&self, digest: HashValue, value: PersistedValue) -> anyhow::Result<bool> {
let expiration = value.info.expiration;
let expiration = value.expiration();
if expiration.epoch() == self.epoch() {
// record the round gaps
if expiration.round() > self.last_certified_round() {
Expand Down Expand Up @@ -339,7 +328,7 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
// We need to check up-to-date expiration again because receiving the same
// digest with a higher expiration would update the persisted value and
// effectively extend the expiration.
if entry.get().info.expiration.round() <= expired_round {
if entry.get().expiration().round() <= expired_round {
Some(entry.remove())
} else {
None
Expand All @@ -357,7 +346,7 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {
}

pub fn persist(&self, persist_request: PersistRequest) -> Option<SignedBatchInfo> {
let expiration = persist_request.value.info.expiration;
let expiration = persist_request.value.expiration();
// Network listener should filter messages with wrong expiration epoch.
assert_eq!(
expiration.epoch(),
Expand All @@ -367,7 +356,7 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {

match self.save(persist_request.digest, persist_request.value.clone()) {
Ok(needs_db) => {
let batch_info = persist_request.value.info.clone();
let batch_info = persist_request.value.batch_info().clone();
trace!("QS: sign digest {}", persist_request.digest);
if needs_db {
self.db
Expand Down Expand Up @@ -436,11 +425,7 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchStore<T> {

pub fn get_batch_from_local(&self, digest: &HashValue) -> Result<PersistedValue, Error> {
if let Some(value) = self.db_cache.get(digest) {
if payload_storage_mode(&value) == StorageMode::PersistedOnly {
assert!(
value.maybe_payload.is_none(),
"BatchReader payload and storage kind mismatch"
);
if value.payload_storage_mode() == StorageMode::PersistedOnly {
self.get_batch_from_db(digest)
} else {
// Available in memory.
Expand All @@ -464,9 +449,7 @@ pub trait BatchReader: Send + Sync {

impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchReader for BatchStore<T> {
fn exists(&self, digest: &HashValue) -> Option<PeerId> {
self.get_batch_from_local(digest)
.map(|v| v.info.author)
.ok()
self.get_batch_from_local(digest).map(|v| v.author()).ok()
}

fn get_batch(
Expand All @@ -475,8 +458,8 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchReader for Batch
) -> oneshot::Receiver<Result<Vec<SignedTransaction>, Error>> {
let (tx, rx) = oneshot::channel();

if let Ok(value) = self.get_batch_from_local(proof.digest()) {
tx.send(Ok(value.maybe_payload.expect("Must have payload")))
if let Ok(mut value) = self.get_batch_from_local(proof.digest()) {
tx.send(Ok(value.take_payload().expect("Must have payload")))
.unwrap();
} else {
// Quorum store metrics
Expand Down
Loading

0 comments on commit 510ffa0

Please sign in to comment.