From 286d34eaab862ee1461ecccc07d430367560774e Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Mon, 5 Dec 2022 09:27:24 -0800 Subject: [PATCH 01/13] Quorum Store related changes to consensus networking. - Add messages for Fragment, Batch, SignedDigest, ProofOfStore - Add sender-only verification for some messages - Add a separate FIFO local channel for quorum store messages - Add and use broadcast without sending to self --- Cargo.lock | 1 + consensus/Cargo.toml | 1 + .../consensus-types/src/proof_of_store.rs | 54 ++++- consensus/src/counters.rs | 10 + consensus/src/epoch_manager.rs | 14 +- .../tests/buffer_manager_tests.rs | 2 +- consensus/src/network.rs | 108 +++++++-- consensus/src/network_interface.rs | 17 ++ consensus/src/quorum_store/mod.rs | 4 + consensus/src/quorum_store/types.rs | 207 ++++++++++++++++++ consensus/src/round_manager.rs | 38 +++- .../tests/staged/consensus.yaml | 69 ++++++ 12 files changed, 504 insertions(+), 21 deletions(-) create mode 100644 consensus/src/quorum_store/types.rs diff --git a/Cargo.lock b/Cargo.lock index 67ac0c438f048..1706b7ecffe47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -547,6 +547,7 @@ dependencies = [ "safety-rules", "schemadb", "serde 1.0.149", + "serde_bytes", "serde_json", "short-hex-str", "storage-interface", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index d8677cb25a443..7fe49a93337c8 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -50,6 +50,7 @@ rand = { workspace = true } safety-rules = { workspace = true } schemadb = { workspace = true } serde = { workspace = true } +serde_bytes = { workspace = true } serde_json = { workspace = true } short-hex-str = { workspace = true } storage-interface = { workspace = true } diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index fcc98bba7ae94..c66fa7ff92752 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -3,11 +3,14 @@ use crate::common::Round; use anyhow::Context; -use aptos_crypto::HashValue; +use aptos_crypto::{bls12381, CryptoMaterialError, HashValue}; use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher}; use aptos_types::aggregate_signature::AggregateSignature; +use aptos_types::validator_signer::ValidatorSigner; use aptos_types::validator_verifier::ValidatorVerifier; +use aptos_types::PeerId; use serde::{Deserialize, Serialize}; +use std::sync::Arc; #[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)] pub struct LogicalTime { @@ -39,6 +42,55 @@ pub struct SignedDigestInfo { pub num_bytes: u64, } +impl SignedDigestInfo { + pub fn new(digest: HashValue, expiration: LogicalTime, num_txns: u64, num_bytes: u64) -> Self { + Self { + digest, + expiration, + num_txns, + num_bytes, + } + } +} + +// TODO: implement properly (and proper place) w.o. public fields. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct SignedDigest { + epoch: u64, + pub peer_id: PeerId, + pub info: SignedDigestInfo, + pub signature: bls12381::Signature, +} + +impl SignedDigest { + pub fn new( + epoch: u64, + digest: HashValue, + expiration: LogicalTime, + num_txns: u64, + num_bytes: u64, + validator_signer: Arc, + ) -> Result { + let info = SignedDigestInfo::new(digest, expiration, num_txns, num_bytes); + let signature = validator_signer.sign(&info)?; + + Ok(Self { + epoch, + peer_id: validator_signer.author(), + info, + signature, + }) + } + + pub fn epoch(&self) -> u64 { + self.epoch + } + + pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { + Ok(validator.verify(self.peer_id, &self.info, &self.signature)?) + } +} + #[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)] #[allow(dead_code)] pub struct ProofOfStore { diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index 2b9eca340361d..a7171a8a6b1a0 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -491,6 +491,16 @@ pub static ROUND_MANAGER_CHANNEL_MSGS: Lazy = Lazy::new(|| { .unwrap() }); +/// Counters(queued,dequeued,dropped) related to quorum store channel +pub static QUORUM_STORE_CHANNEL_MSGS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "aptos_quorum_store_channel_msgs_count", + "Counters(queued,dequeued,dropped) related to quorum store channel", + &["state"] + ) + .unwrap() +}); + /// Counters(queued,dequeued,dropped) related to block retrieval channel pub static BLOCK_RETRIEVAL_CHANNEL_MSGS: Lazy = Lazy::new(|| { register_int_counter_vec!( diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 1273162e20d5b..7570dd7a74925 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -759,9 +759,11 @@ impl EpochManager { // same epoch -> run well-formedness + signature check let verified_event = monitor!( "verify_message", - unverified_event - .clone() - .verify(&self.epoch_state().verifier, self.quorum_store_enabled) + unverified_event.clone().verify( + peer_id, + &self.epoch_state().verifier, + self.quorum_store_enabled + ) ) .context("[EpochManager] Verify event") .map_err(|err| { @@ -790,7 +792,11 @@ impl EpochManager { | ConsensusMsg::SyncInfo(_) | ConsensusMsg::VoteMsg(_) | ConsensusMsg::CommitVoteMsg(_) - | ConsensusMsg::CommitDecisionMsg(_) => { + | ConsensusMsg::CommitDecisionMsg(_) + | ConsensusMsg::FragmentMsg(_) + | ConsensusMsg::BatchMsg(_) + | ConsensusMsg::SignedDigestMsg(_) + | ConsensusMsg::ProofOfStoreMsg(_) => { let event: UnverifiedEvent = msg.into(); if event.epoch() == self.epoch() { return Ok(Some(event)); diff --git a/consensus/src/experimental/tests/buffer_manager_tests.rs b/consensus/src/experimental/tests/buffer_manager_tests.rs index ad368fefb85f5..a7506846886b7 100644 --- a/consensus/src/experimental/tests/buffer_manager_tests.rs +++ b/consensus/src/experimental/tests/buffer_manager_tests.rs @@ -206,7 +206,7 @@ async fn loopback_commit_vote( let event: UnverifiedEvent = msg.into(); // verify the message and send the message into self loop msg_tx - .push(author, event.verify(verifier, false).unwrap()) + .push(author, event.verify(author, verifier, false).unwrap()) .ok(); } } diff --git a/consensus/src/network.rs b/consensus/src/network.rs index 740bb011c1a12..e402b7d2754bb 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::block_storage::tracing::{observe_block, BlockStage}; +use crate::quorum_store::types::{Batch, Fragment}; use crate::{ counters, logging::LogEvent, @@ -13,6 +14,7 @@ use aptos_consensus_types::{ block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse, MAX_BLOCKS_PER_REQUEST}, common::Author, experimental::{commit_decision::CommitDecision, commit_vote::CommitVote}, + proof_of_store::{ProofOfStore, SignedDigest}, proposal_msg::ProposalMsg, sync_info::SyncInfo, vote_msg::VoteMsg, @@ -55,10 +57,21 @@ pub struct NetworkReceivers { (AccountAddress, Discriminant), (AccountAddress, ConsensusMsg), >, + pub quorum_store_messages: aptos_channel::Receiver< + (AccountAddress, Discriminant), + (AccountAddress, ConsensusMsg), + >, pub block_retrieval: aptos_channel::Receiver, } +#[async_trait::async_trait] +pub trait QuorumStoreSender { + async fn send_batch(&self, batch: Batch, recipients: Vec); + + async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec); +} + /// Implements the actual networking support for all consensus messaging. #[derive(Clone)] pub struct NetworkSender { @@ -142,6 +155,13 @@ impl NetworkSender { error!("Error broadcasting to self: {:?}", err); } + self.broadcast_without_self(msg).await; + } + + /// Tries to send the given msg to all the participants, excluding self. + async fn broadcast_without_self(&mut self, msg: ConsensusMsg) { + fail_point!("consensus::send::any", |_| ()); + // Get the list of validators excluding our own account address. Note the // ordering is not important in this case. let self_author = self.author; @@ -224,6 +244,22 @@ impl NetworkSender { self.send(msg, vec![recipient]).await } + // TODO: remove allow(dead_code) when quorum store implementation is added + #[allow(dead_code)] + pub async fn broadcast_fragment(&mut self, fragment: Fragment) { + fail_point!("consensus::send::broadcast_fragment", |_| ()); + let msg = ConsensusMsg::FragmentMsg(Box::new(fragment)); + self.broadcast_without_self(msg).await + } + + // TODO: remove allow(dead_code) when quorum store implementation is added + #[allow(dead_code)] + pub async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore) { + fail_point!("consensus::send::proof_of_store", |_| ()); + let msg = ConsensusMsg::ProofOfStoreMsg(Box::new(proof_of_store)); + self.broadcast_without_self(msg).await + } + /// Sends the vote to the chosen recipients (typically that would be the recipients that /// we believe could serve as proposers in the next round). The recipients on the receiving /// end are going to be notified about a new vote in the vote queue. @@ -271,11 +307,30 @@ impl NetworkSender { } } +#[async_trait::async_trait] +impl QuorumStoreSender for NetworkSender { + async fn send_batch(&self, batch: Batch, recipients: Vec) { + fail_point!("consensus::send_batch", |_| ()); + let msg = ConsensusMsg::BatchMsg(Box::new(batch)); + self.send(msg, recipients).await + } + + async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec) { + fail_point!("consensus::send_signed_digest", |_| ()); + let msg = ConsensusMsg::SignedDigestMsg(Box::new(signed_digest)); + self.send(msg, recipients).await + } +} + pub struct NetworkTask { consensus_messages_tx: aptos_channel::Sender< (AccountAddress, Discriminant), (AccountAddress, ConsensusMsg), >, + quorum_store_messages_tx: aptos_channel::Sender< + (AccountAddress, Discriminant), + (AccountAddress, ConsensusMsg), + >, block_retrieval_tx: aptos_channel::Sender, all_events: Box> + Send + Unpin>, @@ -289,6 +344,11 @@ impl NetworkTask { ) -> (NetworkTask, NetworkReceivers) { let (consensus_messages_tx, consensus_messages) = aptos_channel::new(QueueStyle::LIFO, 1, Some(&counters::CONSENSUS_CHANNEL_MSGS)); + let (quorum_store_messages_tx, quorum_store_messages) = aptos_channel::new( + QueueStyle::FIFO, + 1000, + Some(&counters::QUORUM_STORE_CHANNEL_MSGS), + ); let (block_retrieval_tx, block_retrieval) = aptos_channel::new( QueueStyle::LIFO, 1, @@ -298,11 +358,13 @@ impl NetworkTask { ( NetworkTask { consensus_messages_tx, + quorum_store_messages_tx, block_retrieval_tx, all_events, }, NetworkReceivers { consensus_messages, + quorum_store_messages, block_retrieval, }, ) @@ -315,20 +377,38 @@ impl NetworkTask { counters::CONSENSUS_RECEIVED_MSGS .with_label_values(&[msg.name()]) .inc(); - if let ConsensusMsg::ProposalMsg(proposal) = &msg { - observe_block( - proposal.proposal().timestamp_usecs(), - BlockStage::NETWORK_RECEIVED, - ); - } - if let Err(e) = self - .consensus_messages_tx - .push((peer_id, discriminant(&msg)), (peer_id, msg)) - { - warn!( - remote_peer = peer_id, - error = ?e, "Error pushing consensus msg", - ); + match msg { + quorum_store_msg @ (ConsensusMsg::SignedDigestMsg(_) + | ConsensusMsg::FragmentMsg(_) + | ConsensusMsg::BatchMsg(_) + | ConsensusMsg::ProofOfStoreMsg(_)) => { + if let Err(e) = self.quorum_store_messages_tx.push( + (peer_id, discriminant(&quorum_store_msg)), + (peer_id, quorum_store_msg), + ) { + warn!( + remote_peer = peer_id, + error = ?e, "Error pushing consensus quorum store msg", + ); + } + } + consensus_msg => { + if let ConsensusMsg::ProposalMsg(proposal) = &consensus_msg { + observe_block( + proposal.proposal().timestamp_usecs(), + BlockStage::NETWORK_RECEIVED, + ); + } + if let Err(e) = self.consensus_messages_tx.push( + (peer_id, discriminant(&consensus_msg)), + (peer_id, consensus_msg), + ) { + warn!( + remote_peer = peer_id, + error = ?e, "Error pushing consensus msg", + ); + } + } } } Event::RpcRequest(peer_id, msg, protocol, callback) => match msg { diff --git a/consensus/src/network_interface.rs b/consensus/src/network_interface.rs index 333f03db0bbfb..8482c39523111 100644 --- a/consensus/src/network_interface.rs +++ b/consensus/src/network_interface.rs @@ -4,8 +4,10 @@ //! Interface between Consensus and Network layers. use crate::counters; +use crate::quorum_store::types::{Batch, Fragment}; use anyhow::anyhow; use aptos_config::network_id::{NetworkId, PeerNetworkId}; +use aptos_consensus_types::proof_of_store::{ProofOfStore, SignedDigest}; use aptos_consensus_types::{ block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse}, epoch_retrieval::EpochRetrievalRequest, @@ -62,6 +64,17 @@ pub enum ConsensusMsg { /// than 2f + 1 signatures on the commit proposal. This part is not on the critical path, but /// it can save slow machines to quickly confirm the execution result. CommitDecisionMsg(Box), + /// Quorum Store: Send a fragment -- a sequence of transactions that are part of an in-progress + /// batch -- from the fragment generator to remote validators. + FragmentMsg(Box), + /// Quorum Store: Request or response for a completed batch -- a sequence of transactions, + /// identified by its digest. + BatchMsg(Box), + /// Quorum Store: Send a signed batch digest. This is a vote for the batch and a promise that + /// the batch of transactions was received and will be persisted until batch expiration. + SignedDigestMsg(Box), + /// Quorum Store: Broadcast a completed proof of store (a digest that received 2f+1 votes). + ProofOfStoreMsg(Box), } /// Network type for consensus @@ -79,6 +92,10 @@ impl ConsensusMsg { ConsensusMsg::VoteMsg(_) => "VoteMsg", ConsensusMsg::CommitVoteMsg(_) => "CommitVoteMsg", ConsensusMsg::CommitDecisionMsg(_) => "CommitDecisionMsg", + ConsensusMsg::FragmentMsg(_) => "FragmentMsg", + ConsensusMsg::BatchMsg(_) => "BatchMsg", + ConsensusMsg::SignedDigestMsg(_) => "SignedDigestMsg", + ConsensusMsg::ProofOfStoreMsg(_) => "ProofOfStoreMsg", } } } diff --git a/consensus/src/quorum_store/mod.rs b/consensus/src/quorum_store/mod.rs index 66aac53134730..ff74d6763d2b4 100644 --- a/consensus/src/quorum_store/mod.rs +++ b/consensus/src/quorum_store/mod.rs @@ -5,6 +5,10 @@ pub mod direct_mempool_quorum_store; pub(crate) mod batch_reader; +// TODO: remove allow(dead_code) when quorum store implementation is added +#[allow(dead_code)] +pub(crate) mod types; + mod counters; #[cfg(test)] mod tests; diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs new file mode 100644 index 0000000000000..ff297193243dd --- /dev/null +++ b/consensus/src/quorum_store/types.rs @@ -0,0 +1,207 @@ +// Copyright (c) Aptos +// SPDX-License-Identifier: Apache-2.0 + +use aptos_consensus_types::proof_of_store::LogicalTime; +use aptos_crypto::HashValue; +use aptos_types::transaction::SignedTransaction; +use aptos_types::PeerId; +use bcs::to_bytes; +use serde::{Deserialize, Serialize}; +use std::mem; + +pub(crate) type BatchId = u64; + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub struct SerializedTransaction { + // pub(crate) for testing purposes + #[serde(with = "serde_bytes")] + pub(crate) bytes: Vec, +} + +impl SerializedTransaction { + pub fn from_signed_txn(txn: &SignedTransaction) -> Self { + Self { + bytes: to_bytes(&txn).unwrap(), + } + } + + pub fn len(&self) -> usize { + self.bytes.len() + } + + pub fn bytes(&self) -> &Vec { + &self.bytes + } + + pub fn take_bytes(&mut self) -> Vec { + mem::take(&mut self.bytes) + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct FragmentInfo { + epoch: u64, + batch_id: u64, + fragment_id: usize, + payload: Vec, + maybe_expiration: Option, +} + +impl FragmentInfo { + fn new( + epoch: u64, + batch_id: u64, + fragment_id: usize, + fragment_payload: Vec, + maybe_expiration: Option, + ) -> Self { + Self { + epoch, + batch_id, + fragment_id, + payload: fragment_payload, + maybe_expiration, + } + } + + pub(crate) fn take_transactions(self) -> Vec { + self.payload + } + + pub(crate) fn fragment_id(&self) -> usize { + self.fragment_id + } + + pub(crate) fn batch_id(&self) -> BatchId { + self.batch_id + } + + pub(crate) fn maybe_expiration(&self) -> Option { + self.maybe_expiration + } +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Fragment { + pub source: PeerId, + pub fragment_info: FragmentInfo, +} + +impl Fragment { + pub fn new( + epoch: u64, + batch_id: u64, + fragment_id: usize, + fragment_payload: Vec, + maybe_expiration: Option, + peer_id: PeerId, + ) -> Self { + let fragment_info = FragmentInfo::new( + epoch, + batch_id, + fragment_id, + fragment_payload, + maybe_expiration, + ); + Self { + source: peer_id, + fragment_info, + } + } + + pub(crate) fn verify(&self, peer_id: PeerId) -> anyhow::Result<()> { + if let Some(expiration) = &self.fragment_info.maybe_expiration { + if expiration.epoch() != self.fragment_info.epoch { + return Err(anyhow::anyhow!( + "Epoch mismatch: info: {}, expiration: {}", + expiration.epoch(), + self.fragment_info.epoch + )); + } + } + if self.source == peer_id { + Ok(()) + } else { + Err(anyhow::anyhow!( + "Sender mismatch: peer_id: {}, source: {}", + self.source, + peer_id + )) + } + } + + pub(crate) fn epoch(&self) -> u64 { + self.fragment_info.epoch + } + + pub(crate) fn take_transactions(self) -> Vec { + self.fragment_info.take_transactions() + } + + pub(crate) fn source(&self) -> PeerId { + self.source + } + + pub(crate) fn fragment_id(&self) -> usize { + self.fragment_info.fragment_id() + } + + pub(crate) fn batch_id(&self) -> BatchId { + self.fragment_info.batch_id() + } +} + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub struct BatchInfo { + pub(crate) epoch: u64, + pub(crate) digest: HashValue, +} + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub struct Batch { + pub(crate) source: PeerId, + // None is a request, Some(payload) is a response. + pub(crate) maybe_payload: Option>, + pub(crate) batch_info: BatchInfo, +} + +// TODO: make epoch, source, signature fields treatment consistent across structs. +impl Batch { + pub fn new( + epoch: u64, + source: PeerId, + digest_hash: HashValue, + maybe_payload: Option>, + ) -> Self { + let batch_info = BatchInfo { + epoch, + digest: digest_hash, + }; + Self { + source, + maybe_payload, + batch_info, + } + } + + pub fn epoch(&self) -> u64 { + self.batch_info.epoch + } + + // Check the source == the sender. To protect from DDoS we check is Payload matches digest later. + pub fn verify(&self, peer_id: PeerId) -> anyhow::Result<()> { + if self.source == peer_id { + Ok(()) + } else { + Err(anyhow::anyhow!( + "Sender mismatch: peer_id: {}, source: {}", + self.source, + peer_id + )) + } + } + + pub fn get_payload(self) -> Vec { + self.maybe_payload.expect("Batch contains no payload") + } +} diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index 95c2066dc57c8..2d7c3be54f309 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -1,6 +1,7 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::quorum_store::types::{Batch, Fragment}; use crate::{ block_storage::{ tracing::{observe_block, BlockStage}, @@ -28,6 +29,7 @@ use aptos_consensus_types::{ block::Block, common::{Author, Round}, experimental::{commit_decision::CommitDecision, commit_vote::CommitVote}, + proof_of_store::{ProofOfStore, SignedDigest}, proposal_msg::ProposalMsg, quorum_cert::QuorumCert, sync_info::SyncInfo, @@ -39,7 +41,7 @@ use aptos_infallible::{checked, Mutex}; use aptos_logger::prelude::*; use aptos_types::{ epoch_state::EpochState, on_chain_config::OnChainConsensusConfig, - validator_verifier::ValidatorVerifier, + validator_verifier::ValidatorVerifier, PeerId, }; use channel::aptos_channel; use fail::fail_point; @@ -62,6 +64,10 @@ pub enum UnverifiedEvent { SyncInfo(Box), CommitVote(Box), CommitDecision(Box), + FragmentMsg(Box), + BatchMsg(Box), + SignedDigestMsg(Box), + ProofOfStoreMsg(Box), } pub const BACK_PRESSURE_POLLING_INTERVAL_MS: u64 = 10; @@ -69,6 +75,7 @@ pub const BACK_PRESSURE_POLLING_INTERVAL_MS: u64 = 10; impl UnverifiedEvent { pub fn verify( self, + peer_id: PeerId, validator: &ValidatorVerifier, quorum_store_enabled: bool, ) -> Result { @@ -91,6 +98,23 @@ impl UnverifiedEvent { cd.verify(validator)?; VerifiedEvent::CommitDecision(cd) } + UnverifiedEvent::FragmentMsg(f) => { + f.verify(peer_id)?; + VerifiedEvent::FragmentMsg(f) + } + // Only sender is verified. Remaining verification is on-demand (when it's used). + UnverifiedEvent::BatchMsg(b) => { + b.verify(peer_id)?; + VerifiedEvent::UnverifiedBatchMsg(b) + } + UnverifiedEvent::SignedDigestMsg(sd) => { + sd.verify(validator)?; + VerifiedEvent::SignedDigestMsg(sd) + } + UnverifiedEvent::ProofOfStoreMsg(p) => { + p.verify(validator)?; + VerifiedEvent::ProofOfStoreMsg(p) + } }) } @@ -101,6 +125,10 @@ impl UnverifiedEvent { UnverifiedEvent::SyncInfo(s) => s.epoch(), UnverifiedEvent::CommitVote(cv) => cv.epoch(), UnverifiedEvent::CommitDecision(cd) => cd.epoch(), + UnverifiedEvent::FragmentMsg(f) => f.epoch(), + UnverifiedEvent::BatchMsg(b) => b.epoch(), + UnverifiedEvent::SignedDigestMsg(sd) => sd.epoch(), + UnverifiedEvent::ProofOfStoreMsg(p) => p.epoch(), } } } @@ -113,6 +141,10 @@ impl From for UnverifiedEvent { ConsensusMsg::SyncInfo(m) => UnverifiedEvent::SyncInfo(m), ConsensusMsg::CommitVoteMsg(m) => UnverifiedEvent::CommitVote(m), ConsensusMsg::CommitDecisionMsg(m) => UnverifiedEvent::CommitDecision(m), + ConsensusMsg::FragmentMsg(m) => UnverifiedEvent::FragmentMsg(m), + ConsensusMsg::BatchMsg(m) => UnverifiedEvent::BatchMsg(m), + ConsensusMsg::SignedDigestMsg(m) => UnverifiedEvent::SignedDigestMsg(m), + ConsensusMsg::ProofOfStoreMsg(m) => UnverifiedEvent::ProofOfStoreMsg(m), _ => unreachable!("Unexpected conversion"), } } @@ -127,6 +159,10 @@ pub enum VerifiedEvent { UnverifiedSyncInfo(Box), CommitVote(Box), CommitDecision(Box), + FragmentMsg(Box), + UnverifiedBatchMsg(Box), + SignedDigestMsg(Box), + ProofOfStoreMsg(Box), // local messages LocalTimeout(Round), } diff --git a/testsuite/generate-format/tests/staged/consensus.yaml b/testsuite/generate-format/tests/staged/consensus.yaml index 277bf23ea47c2..c8c7716e40a67 100644 --- a/testsuite/generate-format/tests/staged/consensus.yaml +++ b/testsuite/generate-format/tests/staged/consensus.yaml @@ -38,6 +38,21 @@ AggregateSignatureWithRounds: TYPENAME: AggregateSignature - rounds: SEQ: U64 +Batch: + STRUCT: + - source: + TYPENAME: AccountAddress + - maybe_payload: + OPTION: + SEQ: + TYPENAME: SignedTransaction + - batch_info: + TYPENAME: BatchInfo +BatchInfo: + STRUCT: + - epoch: U64 + - digest: + TYPENAME: HashValue BitVec: STRUCT: - inner: BYTES @@ -190,6 +205,22 @@ ConsensusMsg: CommitDecisionMsg: NEWTYPE: TYPENAME: CommitDecision + 9: + FragmentMsg: + NEWTYPE: + TYPENAME: Fragment + 10: + BatchMsg: + NEWTYPE: + TYPENAME: Batch + 11: + SignedDigestMsg: + NEWTYPE: + TYPENAME: SignedDigest + 12: + ProofOfStoreMsg: + NEWTYPE: + TYPENAME: ProofOfStore ContractEvent: ENUM: 0: @@ -239,6 +270,23 @@ EventKey: - creation_number: U64 - account_address: TYPENAME: AccountAddress +Fragment: + STRUCT: + - source: + TYPENAME: AccountAddress + - fragment_info: + TYPENAME: FragmentInfo +FragmentInfo: + STRUCT: + - epoch: U64 + - batch_id: U64 + - fragment_id: U64 + - payload: + SEQ: + TYPENAME: SerializedTransaction + - maybe_expiration: + OPTION: + TYPENAME: LogicalTime HashValue: STRUCT: - hash: @@ -294,21 +342,27 @@ Payload: NEWTYPE: SEQ: TYPENAME: SignedTransaction +<<<<<<< HEAD 1: InQuorumStore: NEWTYPE: TYPENAME: ProofWithData +======= +>>>>>>> f336cfc341 (Quorum Store related changes to consensus networking.) ProofOfStore: STRUCT: - info: TYPENAME: SignedDigestInfo - multi_signature: TYPENAME: AggregateSignature +<<<<<<< HEAD ProofWithData: STRUCT: - proofs: SEQ: TYPENAME: ProofOfStore +======= +>>>>>>> f336cfc341 (Quorum Store related changes to consensus networking.) ProposalMsg: STRUCT: - proposal: @@ -344,8 +398,23 @@ Script: - args: SEQ: TYPENAME: TransactionArgument +SerializedTransaction: + STRUCT: + - bytes: BYTES Signature: NEWTYPESTRUCT: BYTES +<<<<<<< HEAD +======= +SignedDigest: + STRUCT: + - epoch: U64 + - peer_id: + TYPENAME: AccountAddress + - info: + TYPENAME: SignedDigestInfo + - signature: + TYPENAME: Signature +>>>>>>> f336cfc341 (Quorum Store related changes to consensus networking.) SignedDigestInfo: STRUCT: - digest: From 86ba9eeaaf14d2501247fbc8a8f9afa5b42bdab7 Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Thu, 8 Dec 2022 15:08:28 -0800 Subject: [PATCH 02/13] do generate again --- testsuite/generate-format/tests/staged/consensus.yaml | 9 --------- 1 file changed, 9 deletions(-) diff --git a/testsuite/generate-format/tests/staged/consensus.yaml b/testsuite/generate-format/tests/staged/consensus.yaml index c8c7716e40a67..0b926635641fe 100644 --- a/testsuite/generate-format/tests/staged/consensus.yaml +++ b/testsuite/generate-format/tests/staged/consensus.yaml @@ -342,27 +342,21 @@ Payload: NEWTYPE: SEQ: TYPENAME: SignedTransaction -<<<<<<< HEAD 1: InQuorumStore: NEWTYPE: TYPENAME: ProofWithData -======= ->>>>>>> f336cfc341 (Quorum Store related changes to consensus networking.) ProofOfStore: STRUCT: - info: TYPENAME: SignedDigestInfo - multi_signature: TYPENAME: AggregateSignature -<<<<<<< HEAD ProofWithData: STRUCT: - proofs: SEQ: TYPENAME: ProofOfStore -======= ->>>>>>> f336cfc341 (Quorum Store related changes to consensus networking.) ProposalMsg: STRUCT: - proposal: @@ -403,8 +397,6 @@ SerializedTransaction: - bytes: BYTES Signature: NEWTYPESTRUCT: BYTES -<<<<<<< HEAD -======= SignedDigest: STRUCT: - epoch: U64 @@ -414,7 +406,6 @@ SignedDigest: TYPENAME: SignedDigestInfo - signature: TYPENAME: Signature ->>>>>>> f336cfc341 (Quorum Store related changes to consensus networking.) SignedDigestInfo: STRUCT: - digest: From ea2068a06cf0dceebdddab17d4097d99ccdd13b7 Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Thu, 8 Dec 2022 15:32:40 -0800 Subject: [PATCH 03/13] some cleanup --- consensus/consensus-types/src/proof_of_store.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index c66fa7ff92752..cfcf607e39736 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -92,13 +92,11 @@ impl SignedDigest { } #[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)] -#[allow(dead_code)] pub struct ProofOfStore { info: SignedDigestInfo, multi_signature: AggregateSignature, } -#[allow(dead_code)] impl ProofOfStore { pub fn new(info: SignedDigestInfo, multi_signature: AggregateSignature) -> Self { Self { From 854a42d5bce7683f2e269388e9908d3e6f9bd15a Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Thu, 8 Dec 2022 16:18:33 -0800 Subject: [PATCH 04/13] Additional verify params --- consensus/consensus-types/src/common.rs | 5 ++-- .../consensus-types/src/proof_of_store.rs | 28 +++++++++++++++++-- consensus/consensus-types/src/proposal_msg.rs | 14 +++++++--- consensus/src/quorum_store/types.rs | 18 ++++++++++-- consensus/src/round_manager.rs | 10 +++---- 5 files changed, 60 insertions(+), 15 deletions(-) diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index 4592c7d40e30c..6fe16703fc2d5 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -6,7 +6,7 @@ use aptos_crypto::HashValue; use aptos_executor_types::Error; use aptos_infallible::Mutex; use aptos_types::validator_verifier::ValidatorVerifier; -use aptos_types::{account_address::AccountAddress, transaction::SignedTransaction}; +use aptos_types::{account_address::AccountAddress, transaction::SignedTransaction, PeerId}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; use std::collections::HashSet; @@ -132,6 +132,7 @@ impl Payload { pub fn verify( &self, + peer_id: PeerId, validator: &ValidatorVerifier, quorum_store_enabled: bool, ) -> anyhow::Result<()> { @@ -139,7 +140,7 @@ impl Payload { (false, Payload::DirectMempool(_)) => Ok(()), (true, Payload::InQuorumStore(proof_with_status)) => { for proof in proof_with_status.proofs.iter() { - proof.verify(validator)?; + proof.verify(peer_id, validator, quorum_store_enabled)?; } Ok(()) } diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index cfcf607e39736..7d5e7b1950918 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -86,7 +86,19 @@ impl SignedDigest { self.epoch } - pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { + pub fn verify( + &self, + peer_id: PeerId, + validator: &ValidatorVerifier, + quorum_store_enabled: bool, + ) -> anyhow::Result<()> { + if !quorum_store_enabled { + return Err(anyhow::anyhow!( + "Quorum store is not enabled locally. Sender: {}, epoch: {}", + peer_id, + self.epoch(), + )); + } Ok(validator.verify(self.peer_id, &self.info, &self.signature)?) } } @@ -117,7 +129,19 @@ impl ProofOfStore { self.info.expiration } - pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { + pub fn verify( + &self, + peer_id: PeerId, + validator: &ValidatorVerifier, + quorum_store_enabled: bool, + ) -> anyhow::Result<()> { + if !quorum_store_enabled { + return Err(anyhow::anyhow!( + "Quorum store is not enabled locally. Sender: {}, epoch: {}", + peer_id, + self.epoch(), + )); + } validator .verify_multi_signatures(&self.info, &self.multi_signature) .context("Failed to verify ProofOfStore") diff --git a/consensus/consensus-types/src/proposal_msg.rs b/consensus/consensus-types/src/proposal_msg.rs index 0a1eac4c129ff..5532f47c64e6b 100644 --- a/consensus/consensus-types/src/proposal_msg.rs +++ b/consensus/consensus-types/src/proposal_msg.rs @@ -4,6 +4,7 @@ use crate::{block::Block, common::Author, sync_info::SyncInfo}; use anyhow::{anyhow, ensure, format_err, Context, Result}; use aptos_types::validator_verifier::ValidatorVerifier; +use aptos_types::PeerId; use serde::{Deserialize, Serialize}; use short_hex_str::AsShortHexStr; use std::fmt; @@ -79,10 +80,15 @@ impl ProposalMsg { Ok(()) } - pub fn verify(&self, validator: &ValidatorVerifier, quorum_store_enabled: bool) -> Result<()> { - self.proposal() - .payload() - .map_or(Ok(()), |p| p.verify(validator, quorum_store_enabled))?; + pub fn verify( + &self, + peer_id: PeerId, + validator: &ValidatorVerifier, + quorum_store_enabled: bool, + ) -> Result<()> { + self.proposal().payload().map_or(Ok(()), |p| { + p.verify(peer_id, validator, quorum_store_enabled) + })?; self.proposal() .validate_signature(validator) diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs index ff297193243dd..177a89895e1ba 100644 --- a/consensus/src/quorum_store/types.rs +++ b/consensus/src/quorum_store/types.rs @@ -109,7 +109,14 @@ impl Fragment { } } - pub(crate) fn verify(&self, peer_id: PeerId) -> anyhow::Result<()> { + pub(crate) fn verify(&self, peer_id: PeerId, quorum_store_enabled: bool) -> anyhow::Result<()> { + if !quorum_store_enabled { + return Err(anyhow::anyhow!( + "Quorum store is not enabled locally. Sender: {}, epoch: {}", + peer_id, + self.epoch(), + )); + } if let Some(expiration) = &self.fragment_info.maybe_expiration { if expiration.epoch() != self.fragment_info.epoch { return Err(anyhow::anyhow!( @@ -189,7 +196,14 @@ impl Batch { } // Check the source == the sender. To protect from DDoS we check is Payload matches digest later. - pub fn verify(&self, peer_id: PeerId) -> anyhow::Result<()> { + pub fn verify(&self, peer_id: PeerId, quorum_store_enabled: bool) -> anyhow::Result<()> { + if !quorum_store_enabled { + return Err(anyhow::anyhow!( + "Quorum store is not enabled locally. Sender: {}, epoch: {}", + peer_id, + self.epoch(), + )); + } if self.source == peer_id { Ok(()) } else { diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index 2d7c3be54f309..a51ee553bfc64 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -81,7 +81,7 @@ impl UnverifiedEvent { ) -> Result { Ok(match self { UnverifiedEvent::ProposalMsg(p) => { - p.verify(validator, quorum_store_enabled)?; + p.verify(peer_id, validator, quorum_store_enabled)?; VerifiedEvent::ProposalMsg(p) } UnverifiedEvent::VoteMsg(v) => { @@ -99,20 +99,20 @@ impl UnverifiedEvent { VerifiedEvent::CommitDecision(cd) } UnverifiedEvent::FragmentMsg(f) => { - f.verify(peer_id)?; + f.verify(peer_id, quorum_store_enabled)?; VerifiedEvent::FragmentMsg(f) } // Only sender is verified. Remaining verification is on-demand (when it's used). UnverifiedEvent::BatchMsg(b) => { - b.verify(peer_id)?; + b.verify(peer_id, quorum_store_enabled)?; VerifiedEvent::UnverifiedBatchMsg(b) } UnverifiedEvent::SignedDigestMsg(sd) => { - sd.verify(validator)?; + sd.verify(peer_id, validator, quorum_store_enabled)?; VerifiedEvent::SignedDigestMsg(sd) } UnverifiedEvent::ProofOfStoreMsg(p) => { - p.verify(validator)?; + p.verify(peer_id, validator, quorum_store_enabled)?; VerifiedEvent::ProofOfStoreMsg(p) } }) From 8e27878e4a56b25028decf9e5cf484af19265f68 Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Fri, 9 Dec 2022 10:07:42 -0800 Subject: [PATCH 05/13] Move quorum_store_enabled check to epoch_manager --- consensus/consensus-types/src/common.rs | 5 ++-- .../consensus-types/src/proof_of_store.rs | 28 ++----------------- consensus/consensus-types/src/proposal_msg.rs | 14 +++------- consensus/src/epoch_manager.rs | 27 ++++++++++++++++++ consensus/src/quorum_store/types.rs | 18 ++---------- consensus/src/round_manager.rs | 10 +++---- 6 files changed, 42 insertions(+), 60 deletions(-) diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index 6fe16703fc2d5..4592c7d40e30c 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -6,7 +6,7 @@ use aptos_crypto::HashValue; use aptos_executor_types::Error; use aptos_infallible::Mutex; use aptos_types::validator_verifier::ValidatorVerifier; -use aptos_types::{account_address::AccountAddress, transaction::SignedTransaction, PeerId}; +use aptos_types::{account_address::AccountAddress, transaction::SignedTransaction}; use rayon::prelude::*; use serde::{Deserialize, Serialize}; use std::collections::HashSet; @@ -132,7 +132,6 @@ impl Payload { pub fn verify( &self, - peer_id: PeerId, validator: &ValidatorVerifier, quorum_store_enabled: bool, ) -> anyhow::Result<()> { @@ -140,7 +139,7 @@ impl Payload { (false, Payload::DirectMempool(_)) => Ok(()), (true, Payload::InQuorumStore(proof_with_status)) => { for proof in proof_with_status.proofs.iter() { - proof.verify(peer_id, validator, quorum_store_enabled)?; + proof.verify(validator)?; } Ok(()) } diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index 7d5e7b1950918..cfcf607e39736 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -86,19 +86,7 @@ impl SignedDigest { self.epoch } - pub fn verify( - &self, - peer_id: PeerId, - validator: &ValidatorVerifier, - quorum_store_enabled: bool, - ) -> anyhow::Result<()> { - if !quorum_store_enabled { - return Err(anyhow::anyhow!( - "Quorum store is not enabled locally. Sender: {}, epoch: {}", - peer_id, - self.epoch(), - )); - } + pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { Ok(validator.verify(self.peer_id, &self.info, &self.signature)?) } } @@ -129,19 +117,7 @@ impl ProofOfStore { self.info.expiration } - pub fn verify( - &self, - peer_id: PeerId, - validator: &ValidatorVerifier, - quorum_store_enabled: bool, - ) -> anyhow::Result<()> { - if !quorum_store_enabled { - return Err(anyhow::anyhow!( - "Quorum store is not enabled locally. Sender: {}, epoch: {}", - peer_id, - self.epoch(), - )); - } + pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { validator .verify_multi_signatures(&self.info, &self.multi_signature) .context("Failed to verify ProofOfStore") diff --git a/consensus/consensus-types/src/proposal_msg.rs b/consensus/consensus-types/src/proposal_msg.rs index 5532f47c64e6b..0a1eac4c129ff 100644 --- a/consensus/consensus-types/src/proposal_msg.rs +++ b/consensus/consensus-types/src/proposal_msg.rs @@ -4,7 +4,6 @@ use crate::{block::Block, common::Author, sync_info::SyncInfo}; use anyhow::{anyhow, ensure, format_err, Context, Result}; use aptos_types::validator_verifier::ValidatorVerifier; -use aptos_types::PeerId; use serde::{Deserialize, Serialize}; use short_hex_str::AsShortHexStr; use std::fmt; @@ -80,15 +79,10 @@ impl ProposalMsg { Ok(()) } - pub fn verify( - &self, - peer_id: PeerId, - validator: &ValidatorVerifier, - quorum_store_enabled: bool, - ) -> Result<()> { - self.proposal().payload().map_or(Ok(()), |p| { - p.verify(peer_id, validator, quorum_store_enabled) - })?; + pub fn verify(&self, validator: &ValidatorVerifier, quorum_store_enabled: bool) -> Result<()> { + self.proposal() + .payload() + .map_or(Ok(()), |p| p.verify(validator, quorum_store_enabled))?; self.proposal() .validate_signature(validator) diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 7570dd7a74925..f11de839c01e3 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -756,6 +756,10 @@ impl EpochManager { let maybe_unverified_event = self.check_epoch(peer_id, consensus_msg).await?; if let Some(unverified_event) = maybe_unverified_event { + // quorum store messages are only expected in epochs where it is enabled + self.check_quorum_store_enabled(peer_id, &unverified_event) + .await?; + // same epoch -> run well-formedness + signature check let verified_event = monitor!( "verify_message", @@ -842,6 +846,29 @@ impl EpochManager { Ok(None) } + async fn check_quorum_store_enabled( + &mut self, + peer_id: AccountAddress, + event: &UnverifiedEvent, + ) -> anyhow::Result<()> { + match event { + UnverifiedEvent::FragmentMsg(_) + | UnverifiedEvent::BatchMsg(_) + | UnverifiedEvent::SignedDigestMsg(_) + | UnverifiedEvent::ProofOfStoreMsg(_) => { + if self.quorum_store_enabled { + Ok(()) + } else { + Err(anyhow::anyhow!( + "Quorum store is not enabled locally, but received msg from sender: {}", + peer_id, + )) + } + } + _ => Ok(()), + } + } + fn process_event( &mut self, peer_id: AccountAddress, diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs index 177a89895e1ba..ff297193243dd 100644 --- a/consensus/src/quorum_store/types.rs +++ b/consensus/src/quorum_store/types.rs @@ -109,14 +109,7 @@ impl Fragment { } } - pub(crate) fn verify(&self, peer_id: PeerId, quorum_store_enabled: bool) -> anyhow::Result<()> { - if !quorum_store_enabled { - return Err(anyhow::anyhow!( - "Quorum store is not enabled locally. Sender: {}, epoch: {}", - peer_id, - self.epoch(), - )); - } + pub(crate) fn verify(&self, peer_id: PeerId) -> anyhow::Result<()> { if let Some(expiration) = &self.fragment_info.maybe_expiration { if expiration.epoch() != self.fragment_info.epoch { return Err(anyhow::anyhow!( @@ -196,14 +189,7 @@ impl Batch { } // Check the source == the sender. To protect from DDoS we check is Payload matches digest later. - pub fn verify(&self, peer_id: PeerId, quorum_store_enabled: bool) -> anyhow::Result<()> { - if !quorum_store_enabled { - return Err(anyhow::anyhow!( - "Quorum store is not enabled locally. Sender: {}, epoch: {}", - peer_id, - self.epoch(), - )); - } + pub fn verify(&self, peer_id: PeerId) -> anyhow::Result<()> { if self.source == peer_id { Ok(()) } else { diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index a51ee553bfc64..2d7c3be54f309 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -81,7 +81,7 @@ impl UnverifiedEvent { ) -> Result { Ok(match self { UnverifiedEvent::ProposalMsg(p) => { - p.verify(peer_id, validator, quorum_store_enabled)?; + p.verify(validator, quorum_store_enabled)?; VerifiedEvent::ProposalMsg(p) } UnverifiedEvent::VoteMsg(v) => { @@ -99,20 +99,20 @@ impl UnverifiedEvent { VerifiedEvent::CommitDecision(cd) } UnverifiedEvent::FragmentMsg(f) => { - f.verify(peer_id, quorum_store_enabled)?; + f.verify(peer_id)?; VerifiedEvent::FragmentMsg(f) } // Only sender is verified. Remaining verification is on-demand (when it's used). UnverifiedEvent::BatchMsg(b) => { - b.verify(peer_id, quorum_store_enabled)?; + b.verify(peer_id)?; VerifiedEvent::UnverifiedBatchMsg(b) } UnverifiedEvent::SignedDigestMsg(sd) => { - sd.verify(peer_id, validator, quorum_store_enabled)?; + sd.verify(validator)?; VerifiedEvent::SignedDigestMsg(sd) } UnverifiedEvent::ProofOfStoreMsg(p) => { - p.verify(peer_id, validator, quorum_store_enabled)?; + p.verify(validator)?; VerifiedEvent::ProofOfStoreMsg(p) } }) From cc72c969fe6e713ef8a95041b3884d2ba9aeb505 Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Fri, 9 Dec 2022 10:41:53 -0800 Subject: [PATCH 06/13] Add quorum store sender messages to QuorumStoreSender trait --- consensus/src/network.rs | 34 +++++++++++++++--------------- consensus/src/network_interface.rs | 2 +- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/consensus/src/network.rs b/consensus/src/network.rs index e402b7d2754bb..ff1a65bb18e7d 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -66,10 +66,14 @@ pub struct NetworkReceivers { } #[async_trait::async_trait] -pub trait QuorumStoreSender { +pub(crate) trait QuorumStoreSender { async fn send_batch(&self, batch: Batch, recipients: Vec); async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec); + + async fn broadcast_fragment(&mut self, fragment: Fragment); + + async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore); } /// Implements the actual networking support for all consensus messaging. @@ -244,22 +248,6 @@ impl NetworkSender { self.send(msg, vec![recipient]).await } - // TODO: remove allow(dead_code) when quorum store implementation is added - #[allow(dead_code)] - pub async fn broadcast_fragment(&mut self, fragment: Fragment) { - fail_point!("consensus::send::broadcast_fragment", |_| ()); - let msg = ConsensusMsg::FragmentMsg(Box::new(fragment)); - self.broadcast_without_self(msg).await - } - - // TODO: remove allow(dead_code) when quorum store implementation is added - #[allow(dead_code)] - pub async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore) { - fail_point!("consensus::send::proof_of_store", |_| ()); - let msg = ConsensusMsg::ProofOfStoreMsg(Box::new(proof_of_store)); - self.broadcast_without_self(msg).await - } - /// Sends the vote to the chosen recipients (typically that would be the recipients that /// we believe could serve as proposers in the next round). The recipients on the receiving /// end are going to be notified about a new vote in the vote queue. @@ -320,6 +308,18 @@ impl QuorumStoreSender for NetworkSender { let msg = ConsensusMsg::SignedDigestMsg(Box::new(signed_digest)); self.send(msg, recipients).await } + + async fn broadcast_fragment(&mut self, fragment: Fragment) { + fail_point!("consensus::send::broadcast_fragment", |_| ()); + let msg = ConsensusMsg::FragmentMsg(Box::new(fragment)); + self.broadcast_without_self(msg).await + } + + async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore) { + fail_point!("consensus::send::proof_of_store", |_| ()); + let msg = ConsensusMsg::ProofOfStoreMsg(Box::new(proof_of_store)); + self.broadcast_without_self(msg).await + } } pub struct NetworkTask { diff --git a/consensus/src/network_interface.rs b/consensus/src/network_interface.rs index 8482c39523111..36172ef3353f7 100644 --- a/consensus/src/network_interface.rs +++ b/consensus/src/network_interface.rs @@ -73,7 +73,7 @@ pub enum ConsensusMsg { /// Quorum Store: Send a signed batch digest. This is a vote for the batch and a promise that /// the batch of transactions was received and will be persisted until batch expiration. SignedDigestMsg(Box), - /// Quorum Store: Broadcast a completed proof of store (a digest that received 2f+1 votes). + /// Quorum Store: Broadcast a certified proof of store (a digest that received 2f+1 votes). ProofOfStoreMsg(Box), } From c1d64c966b534d583a35f38d958580f9e993fecf Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Fri, 16 Dec 2022 09:38:31 -0800 Subject: [PATCH 07/13] Add BatchRequest --- .../consensus-types/src/proof_of_store.rs | 1 - consensus/src/epoch_manager.rs | 1 + consensus/src/network.rs | 1 + consensus/src/network_interface.rs | 7 ++-- consensus/src/quorum_store/types.rs | 36 ++++++++++++++++--- consensus/src/round_manager.rs | 10 +++++- 6 files changed, 47 insertions(+), 9 deletions(-) diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index cfcf607e39736..8ff8182fa222f 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -53,7 +53,6 @@ impl SignedDigestInfo { } } -// TODO: implement properly (and proper place) w.o. public fields. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct SignedDigest { epoch: u64, diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index f11de839c01e3..542868fa1ca94 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -798,6 +798,7 @@ impl EpochManager { | ConsensusMsg::CommitVoteMsg(_) | ConsensusMsg::CommitDecisionMsg(_) | ConsensusMsg::FragmentMsg(_) + | ConsensusMsg::BatchRequestMsg(_) | ConsensusMsg::BatchMsg(_) | ConsensusMsg::SignedDigestMsg(_) | ConsensusMsg::ProofOfStoreMsg(_) => { diff --git a/consensus/src/network.rs b/consensus/src/network.rs index ff1a65bb18e7d..7d9c60d8c62f9 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -380,6 +380,7 @@ impl NetworkTask { match msg { quorum_store_msg @ (ConsensusMsg::SignedDigestMsg(_) | ConsensusMsg::FragmentMsg(_) + | ConsensusMsg::BatchRequestMsg(_) | ConsensusMsg::BatchMsg(_) | ConsensusMsg::ProofOfStoreMsg(_)) => { if let Err(e) = self.quorum_store_messages_tx.push( diff --git a/consensus/src/network_interface.rs b/consensus/src/network_interface.rs index 36172ef3353f7..f6d3038b26015 100644 --- a/consensus/src/network_interface.rs +++ b/consensus/src/network_interface.rs @@ -4,7 +4,7 @@ //! Interface between Consensus and Network layers. use crate::counters; -use crate::quorum_store::types::{Batch, Fragment}; +use crate::quorum_store::types::{Batch, BatchRequest, Fragment}; use anyhow::anyhow; use aptos_config::network_id::{NetworkId, PeerNetworkId}; use aptos_consensus_types::proof_of_store::{ProofOfStore, SignedDigest}; @@ -67,7 +67,9 @@ pub enum ConsensusMsg { /// Quorum Store: Send a fragment -- a sequence of transactions that are part of an in-progress /// batch -- from the fragment generator to remote validators. FragmentMsg(Box), - /// Quorum Store: Request or response for a completed batch -- a sequence of transactions, + /// Quorum Store: Request the payloads of a completed batch. + BatchRequestMsg(Box), + /// Quorum Store: Respond with a completed batch's payload -- a sequence of transactions, /// identified by its digest. BatchMsg(Box), /// Quorum Store: Send a signed batch digest. This is a vote for the batch and a promise that @@ -93,6 +95,7 @@ impl ConsensusMsg { ConsensusMsg::CommitVoteMsg(_) => "CommitVoteMsg", ConsensusMsg::CommitDecisionMsg(_) => "CommitDecisionMsg", ConsensusMsg::FragmentMsg(_) => "FragmentMsg", + ConsensusMsg::BatchRequestMsg(_) => "BatchRequestMsg", ConsensusMsg::BatchMsg(_) => "BatchMsg", ConsensusMsg::SignedDigestMsg(_) => "SignedDigestMsg", ConsensusMsg::ProofOfStoreMsg(_) => "ProofOfStoreMsg", diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs index ff297193243dd..9ec2bc78693a3 100644 --- a/consensus/src/quorum_store/types.rs +++ b/consensus/src/quorum_store/types.rs @@ -157,6 +157,35 @@ pub struct BatchInfo { pub(crate) digest: HashValue, } +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] +pub struct BatchRequest { + pub(crate) source: PeerId, + pub(crate) batch_info: BatchInfo, +} + +impl BatchRequest { + pub fn new(source: PeerId, epoch: u64, digest: HashValue) -> Self { + let batch_info = BatchInfo { epoch, digest }; + Self { source, batch_info } + } + + pub fn epoch(&self) -> u64 { + self.batch_info.epoch + } + + pub fn verify(&self, peer_id: PeerId) -> anyhow::Result<()> { + if self.source == peer_id { + Ok(()) + } else { + Err(anyhow::anyhow!( + "Sender mismatch: peer_id: {}, source: {}", + self.source, + peer_id + )) + } + } +} + #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct Batch { pub(crate) source: PeerId, @@ -170,13 +199,10 @@ impl Batch { pub fn new( epoch: u64, source: PeerId, - digest_hash: HashValue, + digest: HashValue, maybe_payload: Option>, ) -> Self { - let batch_info = BatchInfo { - epoch, - digest: digest_hash, - }; + let batch_info = BatchInfo { epoch, digest }; Self { source, maybe_payload, diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index 2d7c3be54f309..8b31036e9597b 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -1,7 +1,7 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 -use crate::quorum_store::types::{Batch, Fragment}; +use crate::quorum_store::types::{Batch, BatchRequest, Fragment}; use crate::{ block_storage::{ tracing::{observe_block, BlockStage}, @@ -65,6 +65,7 @@ pub enum UnverifiedEvent { CommitVote(Box), CommitDecision(Box), FragmentMsg(Box), + BatchRequestMsg(Box), BatchMsg(Box), SignedDigestMsg(Box), ProofOfStoreMsg(Box), @@ -102,6 +103,10 @@ impl UnverifiedEvent { f.verify(peer_id)?; VerifiedEvent::FragmentMsg(f) } + UnverifiedEvent::BatchRequestMsg(br) => { + br.verify(peer_id)?; + VerifiedEvent::BatchRequestMsg(br) + } // Only sender is verified. Remaining verification is on-demand (when it's used). UnverifiedEvent::BatchMsg(b) => { b.verify(peer_id)?; @@ -126,6 +131,7 @@ impl UnverifiedEvent { UnverifiedEvent::CommitVote(cv) => cv.epoch(), UnverifiedEvent::CommitDecision(cd) => cd.epoch(), UnverifiedEvent::FragmentMsg(f) => f.epoch(), + UnverifiedEvent::BatchRequestMsg(br) => br.epoch(), UnverifiedEvent::BatchMsg(b) => b.epoch(), UnverifiedEvent::SignedDigestMsg(sd) => sd.epoch(), UnverifiedEvent::ProofOfStoreMsg(p) => p.epoch(), @@ -142,6 +148,7 @@ impl From for UnverifiedEvent { ConsensusMsg::CommitVoteMsg(m) => UnverifiedEvent::CommitVote(m), ConsensusMsg::CommitDecisionMsg(m) => UnverifiedEvent::CommitDecision(m), ConsensusMsg::FragmentMsg(m) => UnverifiedEvent::FragmentMsg(m), + ConsensusMsg::BatchRequestMsg(m) => UnverifiedEvent::BatchRequestMsg(m), ConsensusMsg::BatchMsg(m) => UnverifiedEvent::BatchMsg(m), ConsensusMsg::SignedDigestMsg(m) => UnverifiedEvent::SignedDigestMsg(m), ConsensusMsg::ProofOfStoreMsg(m) => UnverifiedEvent::ProofOfStoreMsg(m), @@ -160,6 +167,7 @@ pub enum VerifiedEvent { CommitVote(Box), CommitDecision(Box), FragmentMsg(Box), + BatchRequestMsg(Box), UnverifiedBatchMsg(Box), SignedDigestMsg(Box), ProofOfStoreMsg(Box), From 9897303d234434ff31b32c5348104eb38e1f4b0c Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Fri, 16 Dec 2022 09:45:19 -0800 Subject: [PATCH 08/13] Clean up visibility - Private for all members of "message" types. We will likely have to implement getters/setters when integrating QS implementation - Public for all members of *Info types. These are pure containers. - Remove (crate) where the mod is already pub(crate) --- .../consensus-types/src/proof_of_store.rs | 6 +-- consensus/src/quorum_store/types.rs | 46 +++++++++---------- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index 8ff8182fa222f..441ddcc4f0c31 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -56,9 +56,9 @@ impl SignedDigestInfo { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct SignedDigest { epoch: u64, - pub peer_id: PeerId, - pub info: SignedDigestInfo, - pub signature: bls12381::Signature, + peer_id: PeerId, + info: SignedDigestInfo, + signature: bls12381::Signature, } impl SignedDigest { diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs index 9ec2bc78693a3..6db7fc2d97919 100644 --- a/consensus/src/quorum_store/types.rs +++ b/consensus/src/quorum_store/types.rs @@ -9,13 +9,13 @@ use bcs::to_bytes; use serde::{Deserialize, Serialize}; use std::mem; -pub(crate) type BatchId = u64; +pub type BatchId = u64; #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct SerializedTransaction { - // pub(crate) for testing purposes + // pub for testing purposes #[serde(with = "serde_bytes")] - pub(crate) bytes: Vec, + pub bytes: Vec, } impl SerializedTransaction { @@ -64,27 +64,27 @@ impl FragmentInfo { } } - pub(crate) fn take_transactions(self) -> Vec { + pub fn take_transactions(self) -> Vec { self.payload } - pub(crate) fn fragment_id(&self) -> usize { + pub fn fragment_id(&self) -> usize { self.fragment_id } - pub(crate) fn batch_id(&self) -> BatchId { + pub fn batch_id(&self) -> BatchId { self.batch_id } - pub(crate) fn maybe_expiration(&self) -> Option { + pub fn maybe_expiration(&self) -> Option { self.maybe_expiration } } #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Fragment { - pub source: PeerId, - pub fragment_info: FragmentInfo, + source: PeerId, + fragment_info: FragmentInfo, } impl Fragment { @@ -109,7 +109,7 @@ impl Fragment { } } - pub(crate) fn verify(&self, peer_id: PeerId) -> anyhow::Result<()> { + pub fn verify(&self, peer_id: PeerId) -> anyhow::Result<()> { if let Some(expiration) = &self.fragment_info.maybe_expiration { if expiration.epoch() != self.fragment_info.epoch { return Err(anyhow::anyhow!( @@ -130,37 +130,37 @@ impl Fragment { } } - pub(crate) fn epoch(&self) -> u64 { + pub fn epoch(&self) -> u64 { self.fragment_info.epoch } - pub(crate) fn take_transactions(self) -> Vec { + pub fn take_transactions(self) -> Vec { self.fragment_info.take_transactions() } - pub(crate) fn source(&self) -> PeerId { + pub fn source(&self) -> PeerId { self.source } - pub(crate) fn fragment_id(&self) -> usize { + pub fn fragment_id(&self) -> usize { self.fragment_info.fragment_id() } - pub(crate) fn batch_id(&self) -> BatchId { + pub fn batch_id(&self) -> BatchId { self.fragment_info.batch_id() } } #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct BatchInfo { - pub(crate) epoch: u64, - pub(crate) digest: HashValue, + pub epoch: u64, + pub digest: HashValue, } #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct BatchRequest { - pub(crate) source: PeerId, - pub(crate) batch_info: BatchInfo, + source: PeerId, + batch_info: BatchInfo, } impl BatchRequest { @@ -188,13 +188,11 @@ impl BatchRequest { #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct Batch { - pub(crate) source: PeerId, - // None is a request, Some(payload) is a response. - pub(crate) maybe_payload: Option>, - pub(crate) batch_info: BatchInfo, + source: PeerId, + batch_info: BatchInfo, + maybe_payload: Option>, } -// TODO: make epoch, source, signature fields treatment consistent across structs. impl Batch { pub fn new( epoch: u64, From 0abd9fc71869e939db3ab0beb13613455b2bdeb3 Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Fri, 16 Dec 2022 10:46:36 -0800 Subject: [PATCH 09/13] Some cleanup and fixes --- consensus/src/epoch_manager.rs | 8 ++++---- consensus/src/quorum_store/types.rs | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 542868fa1ca94..72a34eb6c367b 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -756,9 +756,8 @@ impl EpochManager { let maybe_unverified_event = self.check_epoch(peer_id, consensus_msg).await?; if let Some(unverified_event) = maybe_unverified_event { - // quorum store messages are only expected in epochs where it is enabled - self.check_quorum_store_enabled(peer_id, &unverified_event) - .await?; + // filter out quorum store messages if quorum store has not been enabled + self.filter_quorum_store_events(peer_id, &unverified_event)?; // same epoch -> run well-formedness + signature check let verified_event = monitor!( @@ -847,13 +846,14 @@ impl EpochManager { Ok(None) } - async fn check_quorum_store_enabled( + fn filter_quorum_store_events( &mut self, peer_id: AccountAddress, event: &UnverifiedEvent, ) -> anyhow::Result<()> { match event { UnverifiedEvent::FragmentMsg(_) + | UnverifiedEvent::BatchRequestMsg(_) | UnverifiedEvent::BatchMsg(_) | UnverifiedEvent::SignedDigestMsg(_) | UnverifiedEvent::ProofOfStoreMsg(_) => { diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs index 6db7fc2d97919..6e4a4f6030ae7 100644 --- a/consensus/src/quorum_store/types.rs +++ b/consensus/src/quorum_store/types.rs @@ -64,7 +64,7 @@ impl FragmentInfo { } } - pub fn take_transactions(self) -> Vec { + pub fn into_transactions(self) -> Vec { self.payload } @@ -134,8 +134,8 @@ impl Fragment { self.fragment_info.epoch } - pub fn take_transactions(self) -> Vec { - self.fragment_info.take_transactions() + pub fn into_transactions(self) -> Vec { + self.fragment_info.into_transactions() } pub fn source(&self) -> PeerId { From eef6752da01eeb4b9b67b1ec89e2ced3a1af4cf9 Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Fri, 16 Dec 2022 14:51:48 -0800 Subject: [PATCH 10/13] remove redundant code --- consensus/src/network.rs | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/consensus/src/network.rs b/consensus/src/network.rs index 29bd4fb7ffd72..f457474336ded 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -370,6 +370,22 @@ impl NetworkTask { ) } + fn push_msg( + peer_id: AccountAddress, + msg: ConsensusMsg, + tx: &aptos_channel::Sender< + (AccountAddress, Discriminant), + (AccountAddress, ConsensusMsg), + >, + ) { + if let Err(e) = tx.push((peer_id, discriminant(&msg)), (peer_id, msg)) { + warn!( + remote_peer = peer_id, + error = ?e, "Error pushing consensus msg", + ); + } + } + pub async fn start(mut self) { while let Some(message) = self.all_events.next().await { match message { @@ -383,15 +399,11 @@ impl NetworkTask { | ConsensusMsg::BatchRequestMsg(_) | ConsensusMsg::BatchMsg(_) | ConsensusMsg::ProofOfStoreMsg(_)) => { - if let Err(e) = self.quorum_store_messages_tx.push( - (peer_id, discriminant(&quorum_store_msg)), - (peer_id, quorum_store_msg), - ) { - warn!( - remote_peer = peer_id, - error = ?e, "Error pushing consensus quorum store msg", - ); - } + Self::push_msg( + peer_id, + quorum_store_msg, + &self.quorum_store_messages_tx, + ); } consensus_msg => { if let ConsensusMsg::ProposalMsg(proposal) = &consensus_msg { @@ -400,15 +412,7 @@ impl NetworkTask { BlockStage::NETWORK_RECEIVED, ); } - if let Err(e) = self.consensus_messages_tx.push( - (peer_id, discriminant(&consensus_msg)), - (peer_id, consensus_msg), - ) { - warn!( - remote_peer = peer_id, - error = ?e, "Error pushing consensus msg", - ); - } + Self::push_msg(peer_id, consensus_msg, &self.consensus_messages_tx); } } } From a762ec18c4b606cc2bf64a905edb432d86e601c6 Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Fri, 16 Dec 2022 15:48:23 -0800 Subject: [PATCH 11/13] more cleanup --- consensus/src/quorum_store/types.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs index 6e4a4f6030ae7..86112419cccce 100644 --- a/consensus/src/quorum_store/types.rs +++ b/consensus/src/quorum_store/types.rs @@ -190,21 +190,21 @@ impl BatchRequest { pub struct Batch { source: PeerId, batch_info: BatchInfo, - maybe_payload: Option>, + payload: Vec, } impl Batch { pub fn new( epoch: u64, - source: PeerId, digest: HashValue, - maybe_payload: Option>, + source: PeerId, + payload: Vec, ) -> Self { let batch_info = BatchInfo { epoch, digest }; Self { source, - maybe_payload, batch_info, + payload, } } @@ -226,6 +226,6 @@ impl Batch { } pub fn get_payload(self) -> Vec { - self.maybe_payload.expect("Batch contains no payload") + self.payload } } From 404814e9db36f894b5a3fd1df12e16bb9d9267ea Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Fri, 16 Dec 2022 17:17:06 -0800 Subject: [PATCH 12/13] regenerate record --- .../tests/staged/consensus.yaml | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/testsuite/generate-format/tests/staged/consensus.yaml b/testsuite/generate-format/tests/staged/consensus.yaml index 0b926635641fe..8ae584fadf858 100644 --- a/testsuite/generate-format/tests/staged/consensus.yaml +++ b/testsuite/generate-format/tests/staged/consensus.yaml @@ -42,17 +42,22 @@ Batch: STRUCT: - source: TYPENAME: AccountAddress - - maybe_payload: - OPTION: - SEQ: - TYPENAME: SignedTransaction - batch_info: TYPENAME: BatchInfo + - payload: + SEQ: + TYPENAME: SignedTransaction BatchInfo: STRUCT: - epoch: U64 - digest: TYPENAME: HashValue +BatchRequest: + STRUCT: + - source: + TYPENAME: AccountAddress + - batch_info: + TYPENAME: BatchInfo BitVec: STRUCT: - inner: BYTES @@ -210,14 +215,18 @@ ConsensusMsg: NEWTYPE: TYPENAME: Fragment 10: + BatchRequestMsg: + NEWTYPE: + TYPENAME: BatchRequest + 11: BatchMsg: NEWTYPE: TYPENAME: Batch - 11: + 12: SignedDigestMsg: NEWTYPE: TYPENAME: SignedDigest - 12: + 13: ProofOfStoreMsg: NEWTYPE: TYPENAME: ProofOfStore From 0b90884d4a47b1f2235f44fee95324c07a313b6f Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Sun, 18 Dec 2022 11:11:09 -0800 Subject: [PATCH 13/13] reduce queue size and add TODO --- consensus/src/network.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/consensus/src/network.rs b/consensus/src/network.rs index f457474336ded..19ed1d2fc03ed 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -346,7 +346,8 @@ impl NetworkTask { aptos_channel::new(QueueStyle::LIFO, 1, Some(&counters::CONSENSUS_CHANNEL_MSGS)); let (quorum_store_messages_tx, quorum_store_messages) = aptos_channel::new( QueueStyle::FIFO, - 1000, + // TODO: tune this value based on quorum store messages with backpressure + 50, Some(&counters::QUORUM_STORE_CHANNEL_MSGS), ); let (block_retrieval_tx, block_retrieval) = aptos_channel::new(