diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index 8b3b2e6c9aa20..de514e7cd77e3 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -98,7 +98,7 @@ impl Payload { Payload::InQuorumStore(proof_with_status) => proof_with_status .proofs .iter() - .map(|proof| proof.info().num_txns as usize) + .map(|proof| proof.num_txns() as usize) .sum(), } } @@ -125,7 +125,7 @@ impl Payload { Payload::InQuorumStore(proof_with_status) => proof_with_status .proofs .iter() - .map(|proof| proof.info().num_bytes as usize) + .map(|proof| proof.num_bytes() as usize) .sum(), } } diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index eec6faf41b136..0dd4427b46f08 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -14,6 +14,8 @@ use serde::{Deserialize, Serialize}; use std::{ cmp::Ordering, fmt::{Display, Formatter}, + hash::Hash, + ops::Deref, }; #[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)] @@ -87,12 +89,12 @@ impl Display for BatchId { Clone, Debug, Deserialize, Serialize, CryptoHasher, BCSCryptoHash, PartialEq, Eq, Hash, )] pub struct BatchInfo { - pub author: PeerId, - pub batch_id: BatchId, - pub expiration: LogicalTime, - pub digest: HashValue, - pub num_txns: u64, - pub num_bytes: u64, + author: PeerId, + batch_id: BatchId, + expiration: LogicalTime, + digest: HashValue, + num_txns: u64, + num_bytes: u64, } impl BatchInfo { @@ -117,12 +119,36 @@ impl BatchInfo { pub fn epoch(&self) -> u64 { self.expiration.epoch } + + pub fn author(&self) -> PeerId { + self.author + } + + pub fn batch_id(&self) -> BatchId { + self.batch_id + } + + pub fn expiration(&self) -> LogicalTime { + self.expiration + } + + pub fn digest(&self) -> &HashValue { + &self.digest + } + + pub fn num_txns(&self) -> u64 { + self.num_txns + } + + pub fn num_bytes(&self) -> u64 { + self.num_bytes + } } #[derive(Clone, Debug, Deserialize, Serialize)] pub struct SignedBatchInfo { - signer: PeerId, info: BatchInfo, + signer: PeerId, signature: bls12381::Signature, } @@ -134,8 +160,8 @@ impl SignedBatchInfo { let signature = validator_signer.sign(&batch_info)?; Ok(Self { - signer: validator_signer.author(), info: batch_info, + signer: validator_signer.author(), signature, }) } @@ -144,10 +170,6 @@ impl SignedBatchInfo { self.signer } - pub fn epoch(&self) -> u64 { - self.info.epoch() - } - pub fn verify(&self, sender: PeerId, validator: &ValidatorVerifier) -> anyhow::Result<()> { if sender == self.signer { Ok(validator.verify(self.signer, &self.info, &self.signature)?) @@ -156,16 +178,20 @@ impl SignedBatchInfo { } } - pub fn info(&self) -> &BatchInfo { - &self.info - } - pub fn signature(self) -> bls12381::Signature { self.signature } - pub fn digest(&self) -> HashValue { - self.info.digest + pub fn batch_info(&self) -> &BatchInfo { + &self.info + } +} + +impl Deref for SignedBatchInfo { + type Target = BatchInfo; + + fn deref(&self) -> &Self::Target { + &self.info } } @@ -191,18 +217,6 @@ impl ProofOfStore { } } - pub fn info(&self) -> &BatchInfo { - &self.info - } - - pub fn digest(&self) -> &HashValue { - &self.info.digest - } - - pub fn expiration(&self) -> LogicalTime { - self.info.expiration - } - pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { validator .verify_multi_signatures(&self.info, &self.multi_signature) @@ -216,8 +230,12 @@ impl ProofOfStore { ret.shuffle(&mut thread_rng()); ret } +} - pub fn epoch(&self) -> u64 { - self.info.expiration.epoch +impl Deref for ProofOfStore { + type Target = BatchInfo; + + fn deref(&self) -> &Self::Target { + &self.info } } diff --git a/consensus/src/quorum_store/batch_coordinator.rs b/consensus/src/quorum_store/batch_coordinator.rs index b2b7eb46120e5..f8dd9bbb3b38e 100644 --- a/consensus/src/quorum_store/batch_coordinator.rs +++ b/consensus/src/quorum_store/batch_coordinator.rs @@ -25,7 +25,7 @@ pub struct BatchCoordinator { my_peer_id: PeerId, network_sender: NetworkSender, batch_store: Arc>, - max_batch_bytes: usize, + max_batch_bytes: u64, } impl BatchCoordinator { @@ -34,7 +34,7 @@ impl BatchCoordinator { my_peer_id: PeerId, network_sender: NetworkSender, batch_store: Arc>, - max_batch_bytes: usize, + max_batch_bytes: u64, ) -> Self { Self { epoch, @@ -83,7 +83,7 @@ impl BatchCoordinator { let network_sender = self.network_sender.clone(); let my_peer_id = self.my_peer_id; tokio::spawn(async move { - let peer_id = persist_request.value.info.author; + let peer_id = persist_request.value.author(); if let Some(signed_batch_info) = batch_store.persist(persist_request) { if my_peer_id != peer_id { counters::RECEIVED_REMOTE_BATCHES_COUNT.inc(); diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index ffbf3d53d793b..5ac5f93a12892 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -131,7 +131,7 @@ impl BatchRequester { match response { Ok(batch) => { counters::RECEIVED_BATCH_RESPONSE_COUNT.inc(); - let digest = batch.digest(); + let digest = *batch.digest(); let payload = batch.into_transactions(); request_state.serve_request(digest, Some(payload)); return; diff --git a/consensus/src/quorum_store/batch_store.rs b/consensus/src/quorum_store/batch_store.rs index 0bc4781187167..73431530e3205 100644 --- a/consensus/src/quorum_store/batch_store.rs +++ b/consensus/src/quorum_store/batch_store.rs @@ -4,8 +4,11 @@ use crate::{ network::QuorumStoreSender, quorum_store::{ - batch_requester::BatchRequester, counters, quorum_store_db::QuorumStoreStorage, - types::PersistedValue, utils::RoundExpirations, + batch_requester::BatchRequester, + counters, + quorum_store_db::QuorumStoreStorage, + types::{PersistedValue, StorageMode}, + utils::RoundExpirations, }, }; use anyhow::bail; @@ -38,13 +41,6 @@ pub struct PersistRequest { pub digest: HashValue, pub value: PersistedValue, } - -#[derive(PartialEq)] -enum StorageMode { - PersistedOnly, - MemoryAndPersisted, -} - struct QuotaManager { memory_balance: usize, db_balance: usize, @@ -100,13 +96,6 @@ impl QuotaManager { } } -fn payload_storage_mode(persisted_value: &PersistedValue) -> StorageMode { - match persisted_value.maybe_payload { - Some(_) => StorageMode::MemoryAndPersisted, - None => StorageMode::PersistedOnly, - } -} - /// Provides in memory representation of stored batches (strong cache), and allows /// efficient concurrent readers. pub struct BatchStore { @@ -174,7 +163,7 @@ impl BatchStore { last_certified_round ); for (digest, value) in db_content { - let expiration = value.info.expiration; + let expiration = value.expiration(); trace!( "QS: Batchreader recovery content exp {:?}, digest {}", @@ -209,11 +198,11 @@ impl BatchStore { fn free_quota(&self, persisted_value: PersistedValue) { let mut quota_manager = self .peer_quota - .get_mut(&persisted_value.info.author) + .get_mut(&persisted_value.author()) .expect("No QuotaManager for batch author"); quota_manager.free_quota( - persisted_value.info.num_bytes as usize, - payload_storage_mode(&persisted_value), + persisted_value.num_bytes() as usize, + persisted_value.payload_storage_mode(), ); } @@ -230,15 +219,15 @@ impl BatchStore { digest: HashValue, mut value: PersistedValue, ) -> anyhow::Result { - let author = value.info.author; - let expiration_round = value.info.expiration.round(); + let author = value.author(); + let expiration_round = value.expiration().round(); { // Acquire dashmap internal lock on the entry corresponding to the digest. let cache_entry = self.db_cache.entry(digest); if let Occupied(entry) = &cache_entry { - if entry.get().info.expiration.round() >= value.info.expiration.round() { + if entry.get().expiration().round() >= value.expiration().round() { debug!( "QS: already have the digest with higher expiration {}", digest @@ -255,7 +244,7 @@ impl BatchStore { self.memory_quota, self.batch_quota, )) - .update_quota(value.info.num_bytes as usize)? + .update_quota(value.num_bytes() as usize)? == StorageMode::PersistedOnly { value.remove_payload(); @@ -282,7 +271,7 @@ impl BatchStore { } pub(crate) fn save(&self, digest: HashValue, value: PersistedValue) -> anyhow::Result { - let expiration = value.info.expiration; + let expiration = value.expiration(); if expiration.epoch() == self.epoch() { // record the round gaps if expiration.round() > self.last_certified_round() { @@ -339,7 +328,7 @@ impl BatchStore { // We need to check up-to-date expiration again because receiving the same // digest with a higher expiration would update the persisted value and // effectively extend the expiration. - if entry.get().info.expiration.round() <= expired_round { + if entry.get().expiration().round() <= expired_round { Some(entry.remove()) } else { None @@ -357,7 +346,7 @@ impl BatchStore { } pub fn persist(&self, persist_request: PersistRequest) -> Option { - let expiration = persist_request.value.info.expiration; + let expiration = persist_request.value.expiration(); // Network listener should filter messages with wrong expiration epoch. assert_eq!( expiration.epoch(), @@ -367,7 +356,7 @@ impl BatchStore { match self.save(persist_request.digest, persist_request.value.clone()) { Ok(needs_db) => { - let batch_info = persist_request.value.info.clone(); + let batch_info = persist_request.value.batch_info().clone(); trace!("QS: sign digest {}", persist_request.digest); if needs_db { self.db @@ -436,11 +425,7 @@ impl BatchStore { pub fn get_batch_from_local(&self, digest: &HashValue) -> Result { if let Some(value) = self.db_cache.get(digest) { - if payload_storage_mode(&value) == StorageMode::PersistedOnly { - assert!( - value.maybe_payload.is_none(), - "BatchReader payload and storage kind mismatch" - ); + if value.payload_storage_mode() == StorageMode::PersistedOnly { self.get_batch_from_db(digest) } else { // Available in memory. @@ -464,9 +449,7 @@ pub trait BatchReader: Send + Sync { impl BatchReader for BatchStore { fn exists(&self, digest: &HashValue) -> Option { - self.get_batch_from_local(digest) - .map(|v| v.info.author) - .ok() + self.get_batch_from_local(digest).map(|v| v.author()).ok() } fn get_batch( @@ -475,8 +458,8 @@ impl BatchReader for Batch ) -> oneshot::Receiver, Error>> { let (tx, rx) = oneshot::channel(); - if let Ok(value) = self.get_batch_from_local(proof.digest()) { - tx.send(Ok(value.maybe_payload.expect("Must have payload"))) + if let Ok(mut value) = self.get_batch_from_local(proof.digest()) { + tx.send(Ok(value.take_payload().expect("Must have payload"))) .unwrap(); } else { // Quorum store metrics diff --git a/consensus/src/quorum_store/proof_coordinator.rs b/consensus/src/quorum_store/proof_coordinator.rs index 894292515e070..ca0779ba6a1f3 100644 --- a/consensus/src/quorum_store/proof_coordinator.rs +++ b/consensus/src/quorum_store/proof_coordinator.rs @@ -54,7 +54,7 @@ impl IncrementalProofState { signed_batch_info: SignedBatchInfo, validator_verifier: &ValidatorVerifier, ) -> Result<(), SignedBatchInfoError> { - if signed_batch_info.info() != &self.info { + if signed_batch_info.batch_info() != &self.info { return Err(SignedBatchInfoError::WrongInfo); } @@ -156,25 +156,27 @@ impl ProofCoordinator { signed_batch_info: &SignedBatchInfo, ) -> Result<(), SignedBatchInfoError> { // Check if the signed digest corresponding to our batch - if signed_batch_info.info().author != self.peer_id { + if signed_batch_info.author() != self.peer_id { return Err(SignedBatchInfoError::WrongAuthor); } let batch_author = self .batch_reader - .exists(&signed_batch_info.digest()) + .exists(signed_batch_info.digest()) .ok_or(SignedBatchInfoError::WrongAuthor)?; - if batch_author != signed_batch_info.info().author { + if batch_author != signed_batch_info.author() { return Err(SignedBatchInfoError::WrongAuthor); } - self.timeouts - .add(signed_batch_info.info().clone(), self.proof_timeout_ms); + self.timeouts.add( + signed_batch_info.batch_info().clone(), + self.proof_timeout_ms, + ); self.digest_to_proof.insert( - signed_batch_info.digest(), - IncrementalProofState::new(signed_batch_info.info().clone()), + *signed_batch_info.digest(), + IncrementalProofState::new(signed_batch_info.batch_info().clone()), ); self.digest_to_time - .entry(signed_batch_info.digest()) + .entry(*signed_batch_info.digest()) .or_insert(chrono::Utc::now().naive_utc().timestamp_micros() as u64); Ok(()) } @@ -186,12 +188,12 @@ impl ProofCoordinator { ) -> Result, SignedBatchInfoError> { if !self .digest_to_proof - .contains_key(&signed_batch_info.digest()) + .contains_key(signed_batch_info.digest()) { self.init_proof(&signed_batch_info)?; } - let digest = signed_batch_info.digest(); - if let Some(value) = self.digest_to_proof.get_mut(&signed_batch_info.digest()) { + let digest = *signed_batch_info.digest(); + if let Some(value) = self.digest_to_proof.get_mut(signed_batch_info.digest()) { value.add_signature(signed_batch_info, validator_verifier)?; if !value.completed && value.ready(validator_verifier) { let proof = value.take(validator_verifier); @@ -211,7 +213,7 @@ impl ProofCoordinator { async fn expire(&mut self) { let mut batch_ids = vec![]; for signed_batch_info_info in self.timeouts.expire() { - if let Some(state) = self.digest_to_proof.remove(&signed_batch_info_info.digest) { + if let Some(state) = self.digest_to_proof.remove(signed_batch_info_info.digest()) { counters::BATCH_RECEIVED_REPLIES_COUNT .observe(state.aggregated_signature.len() as f64); counters::BATCH_RECEIVED_REPLIES_VOTING_POWER @@ -219,7 +221,7 @@ impl ProofCoordinator { counters::BATCH_SUCCESSFUL_CREATION.observe(u64::from(state.completed)); if !state.completed { counters::TIMEOUT_BATCHES_COUNT.inc(); - batch_ids.push(signed_batch_info_info.batch_id); + batch_ids.push(signed_batch_info_info.batch_id()); } } } @@ -252,7 +254,7 @@ impl ProofCoordinator { }, ProofCoordinatorCommand::AppendSignature(signed_batch_info) => { let peer_id = signed_batch_info.signer(); - let digest = signed_batch_info.digest(); + let digest = *signed_batch_info.digest(); match self.add_signature(signed_batch_info, &validator_verifier) { Ok(result) => { if let Some(proof) = result { diff --git a/consensus/src/quorum_store/proof_manager.rs b/consensus/src/quorum_store/proof_manager.rs index 81b219936f814..ad158553a0aa0 100644 --- a/consensus/src/quorum_store/proof_manager.rs +++ b/consensus/src/quorum_store/proof_manager.rs @@ -59,8 +59,8 @@ impl ProofManager { } pub(crate) fn receive_proof(&mut self, proof: ProofOfStore) { - let is_local = proof.info().author == self.my_peer_id; - let num_txns = proof.info().num_txns; + let is_local = proof.author() == self.my_peer_id; + let num_txns = proof.num_txns(); self.increment_remaining_txns(num_txns); self.proofs_for_consensus.push(proof, is_local); } diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index 0412767863899..b1fd3dedad82f 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -319,7 +319,7 @@ impl InnerBuilder { self.author, self.network_sender.clone(), self.batch_store.clone().unwrap(), - self.config.max_batch_bytes, + self.config.max_batch_bytes as u64, ); #[allow(unused_variables)] let name = format!("batch_coordinator-{}", i).as_str(); diff --git a/consensus/src/quorum_store/quorum_store_db.rs b/consensus/src/quorum_store/quorum_store_db.rs index a2795a2d6696f..113888286ffa6 100644 --- a/consensus/src/quorum_store/quorum_store_db.rs +++ b/consensus/src/quorum_store/quorum_store_db.rs @@ -82,7 +82,7 @@ impl QuorumStoreStorage for QuorumStoreDB { trace!( "QS: db persists digest {} expiration {:?}", digest, - batch.info.expiration + batch.expiration() ); Ok(self.db.put::(&digest, &batch)?) } diff --git a/consensus/src/quorum_store/tests/proof_coordinator_test.rs b/consensus/src/quorum_store/tests/proof_coordinator_test.rs index 5ce18a288fde0..f462ce0671297 100644 --- a/consensus/src/quorum_store/tests/proof_coordinator_test.rs +++ b/consensus/src/quorum_store/tests/proof_coordinator_test.rs @@ -59,7 +59,7 @@ async fn test_proof_coordinator_basic() { let digest = batch.digest(); for signer in &signers { - let signed_batch_info = SignedBatchInfo::new(batch.info().clone(), signer).unwrap(); + let signed_batch_info = SignedBatchInfo::new(batch.batch_info().clone(), signer).unwrap(); assert!(proof_coordinator_tx .send(ProofCoordinatorCommand::AppendSignature(signed_batch_info)) .await @@ -71,6 +71,6 @@ async fn test_proof_coordinator_basic() { msg => panic!("Expected LocalProof but received: {:?}", msg), }; // check normal path - assert_eq!(proof.digest().clone(), digest); + assert_eq!(proof.digest(), digest); assert!(proof.verify(&verifier).is_ok()); } diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs index 3179d4ed67596..645d0a1857d30 100644 --- a/consensus/src/quorum_store/types.rs +++ b/consensus/src/quorum_store/types.rs @@ -12,11 +12,18 @@ use aptos_crypto_derive::CryptoHasher; use aptos_types::{transaction::SignedTransaction, PeerId}; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; +use std::ops::Deref; #[derive(Clone, Eq, Deserialize, Serialize, PartialEq, Debug)] pub struct PersistedValue { - pub info: BatchInfo, - pub maybe_payload: Option>, + info: BatchInfo, + maybe_payload: Option>, +} + +#[derive(PartialEq)] +pub enum StorageMode { + PersistedOnly, + MemoryAndPersisted, } impl PersistedValue { @@ -27,9 +34,32 @@ impl PersistedValue { } } + pub fn payload_storage_mode(&self) -> StorageMode { + match self.maybe_payload { + Some(_) => StorageMode::MemoryAndPersisted, + None => StorageMode::PersistedOnly, + } + } + + pub fn take_payload(&mut self) -> Option> { + self.maybe_payload.take() + } + pub(crate) fn remove_payload(&mut self) { self.maybe_payload = None; } + + pub fn batch_info(&self) -> &BatchInfo { + &self.info + } +} + +impl Deref for PersistedValue { + type Target = BatchInfo; + + fn deref(&self) -> &Self::Target { + &self.info + } } impl TryFrom for Batch { @@ -118,49 +148,33 @@ impl Batch { pub fn verify(&self) -> anyhow::Result<()> { ensure!( - self.payload.hash() == self.batch_info.digest, + self.payload.hash() == *self.digest(), "Payload hash doesn't match the digest" ); ensure!( - self.payload.num_txns() as u64 == self.batch_info.num_txns, + self.payload.num_txns() as u64 == self.num_txns(), "Payload num txns doesn't match batch info" ); ensure!( - self.payload.num_bytes() as u64 == self.batch_info.num_bytes, + self.payload.num_bytes() as u64 == self.num_bytes(), "Payload num bytes doesn't match batch info" ); Ok(()) } - pub fn digest(&self) -> HashValue { - self.batch_info.digest - } - - pub fn epoch(&self) -> u64 { - self.batch_info.expiration.epoch() - } - pub fn into_transactions(self) -> Vec { self.payload.txns } - pub fn author(&self) -> PeerId { - self.batch_info.author - } - - pub fn batch_id(&self) -> BatchId { - self.batch_info.batch_id - } - - pub fn num_bytes(&self) -> usize { - self.payload.num_bytes() + pub fn batch_info(&self) -> &BatchInfo { + &self.batch_info } +} - pub fn expiration(&self) -> LogicalTime { - self.batch_info.expiration - } +impl Deref for Batch { + type Target = BatchInfo; - pub fn info(&self) -> &BatchInfo { + fn deref(&self) -> &Self::Target { &self.batch_info } } @@ -213,7 +227,7 @@ impl From for PersistRequest { payload, } = value; Self { - digest: batch_info.digest, + digest: *batch_info.digest(), value: PersistedValue::new(batch_info, Some(payload.into_transactions())), } } @@ -231,7 +245,7 @@ impl BatchMsg { pub fn verify(&self, peer_id: PeerId) -> anyhow::Result<()> { ensure!( - self.batch.batch_info.author == peer_id, + self.batch.author() == peer_id, "Batch author doesn't match sender" ); self.batch.verify() diff --git a/consensus/src/quorum_store/utils.rs b/consensus/src/quorum_store/utils.rs index bb86af12dd9a0..c25714c903ed1 100644 --- a/consensus/src/quorum_store/utils.rs +++ b/consensus/src/quorum_store/utils.rs @@ -240,8 +240,8 @@ impl ProofQueue { { if *expiration >= current_time { // non-committed proof that has not expired - cur_bytes += proof.info().num_bytes; - cur_txns += proof.info().num_txns; + cur_bytes += proof.num_bytes(); + cur_txns += proof.num_txns(); if cur_bytes > max_bytes || cur_txns > max_txns { // Exceeded the limit for requested bytes or number of transactions. full = true; @@ -294,7 +294,7 @@ impl ProofQueue { if *expiration >= current_time { // Not committed if let Some(Some(proof)) = self.digest_proof.get(digest) { - remaining_txns += proof.info().num_txns; + remaining_txns += proof.num_txns(); remaining_proofs += 1; } } diff --git a/testsuite/generate-format/tests/staged/consensus.yaml b/testsuite/generate-format/tests/staged/consensus.yaml index 08d2572995ea7..7d2df1cdc9e81 100644 --- a/testsuite/generate-format/tests/staged/consensus.yaml +++ b/testsuite/generate-format/tests/staged/consensus.yaml @@ -419,10 +419,10 @@ Signature: NEWTYPESTRUCT: BYTES SignedBatchInfo: STRUCT: - - signer: - TYPENAME: AccountAddress - info: TYPENAME: BatchInfo + - signer: + TYPENAME: AccountAddress - signature: TYPENAME: Signature SignedTransaction: