Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Quorum Store] networking integration #5779

Merged
merged 14 commits into from
Dec 18, 2022
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ num-traits = { workspace = true }
once_cell = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
Expand Down
55 changes: 52 additions & 3 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

use crate::common::Round;
use anyhow::Context;
use aptos_crypto::HashValue;
use aptos_crypto::{bls12381, CryptoMaterialError, HashValue};
use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher};
use aptos_types::aggregate_signature::AggregateSignature;
use aptos_types::validator_signer::ValidatorSigner;
use aptos_types::validator_verifier::ValidatorVerifier;
use aptos_types::PeerId;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub struct LogicalTime {
Expand Down Expand Up @@ -39,14 +42,60 @@ pub struct SignedDigestInfo {
pub num_bytes: u64,
}

impl SignedDigestInfo {
pub fn new(digest: HashValue, expiration: LogicalTime, num_txns: u64, num_bytes: u64) -> Self {
Self {
digest,
expiration,
num_txns,
num_bytes,
}
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SignedDigest {
epoch: u64,
peer_id: PeerId,
info: SignedDigestInfo,
signature: bls12381::Signature,
}

impl SignedDigest {
pub fn new(
epoch: u64,
digest: HashValue,
expiration: LogicalTime,
num_txns: u64,
num_bytes: u64,
validator_signer: Arc<ValidatorSigner>,
) -> Result<Self, CryptoMaterialError> {
let info = SignedDigestInfo::new(digest, expiration, num_txns, num_bytes);
let signature = validator_signer.sign(&info)?;

Ok(Self {
epoch,
peer_id: validator_signer.author(),
info,
signature,
})
}

pub fn epoch(&self) -> u64 {
self.epoch
}

pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
Ok(validator.verify(self.peer_id, &self.info, &self.signature)?)
}
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
#[allow(dead_code)]
pub struct ProofOfStore {
info: SignedDigestInfo,
multi_signature: AggregateSignature,
}

#[allow(dead_code)]
impl ProofOfStore {
pub fn new(info: SignedDigestInfo, multi_signature: AggregateSignature) -> Self {
Self {
Expand Down
10 changes: 10 additions & 0 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,16 @@ pub static ROUND_MANAGER_CHANNEL_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Counters(queued,dequeued,dropped) related to quorum store channel
pub static QUORUM_STORE_CHANNEL_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_quorum_store_channel_msgs_count",
"Counters(queued,dequeued,dropped) related to quorum store channel",
&["state"]
)
.unwrap()
});

/// Counters(queued,dequeued,dropped) related to block retrieval channel
pub static BLOCK_RETRIEVAL_CHANNEL_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand Down
42 changes: 38 additions & 4 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,12 +753,17 @@ impl EpochManager {
let maybe_unverified_event = self.check_epoch(peer_id, consensus_msg).await?;

if let Some(unverified_event) = maybe_unverified_event {
// filter out quorum store messages if quorum store has not been enabled
self.filter_quorum_store_events(peer_id, &unverified_event)?;

// same epoch -> run well-formedness + signature check
let verified_event = monitor!(
"verify_message",
unverified_event
.clone()
.verify(&self.epoch_state().verifier, self.quorum_store_enabled)
unverified_event.clone().verify(
peer_id,
&self.epoch_state().verifier,
self.quorum_store_enabled
)
)
.context("[EpochManager] Verify event")
.map_err(|err| {
Expand Down Expand Up @@ -787,7 +792,12 @@ impl EpochManager {
| ConsensusMsg::SyncInfo(_)
| ConsensusMsg::VoteMsg(_)
| ConsensusMsg::CommitVoteMsg(_)
| ConsensusMsg::CommitDecisionMsg(_) => {
| ConsensusMsg::CommitDecisionMsg(_)
| ConsensusMsg::FragmentMsg(_)
| ConsensusMsg::BatchRequestMsg(_)
| ConsensusMsg::BatchMsg(_)
| ConsensusMsg::SignedDigestMsg(_)
| ConsensusMsg::ProofOfStoreMsg(_) => {
let event: UnverifiedEvent = msg.into();
if event.epoch() == self.epoch() {
return Ok(Some(event));
Expand Down Expand Up @@ -833,6 +843,30 @@ impl EpochManager {
Ok(None)
}

fn filter_quorum_store_events(
&mut self,
peer_id: AccountAddress,
event: &UnverifiedEvent,
) -> anyhow::Result<()> {
match event {
UnverifiedEvent::FragmentMsg(_)
| UnverifiedEvent::BatchRequestMsg(_)
| UnverifiedEvent::BatchMsg(_)
| UnverifiedEvent::SignedDigestMsg(_)
| UnverifiedEvent::ProofOfStoreMsg(_) => {
if self.quorum_store_enabled {
Ok(())
} else {
Err(anyhow::anyhow!(
"Quorum store is not enabled locally, but received msg from sender: {}",
peer_id,
))
}
}
_ => Ok(()),
}
}

fn process_event(
&mut self,
peer_id: AccountAddress,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/experimental/tests/buffer_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ async fn loopback_commit_vote(
let event: UnverifiedEvent = msg.into();
// verify the message and send the message into self loop
msg_tx
.push(author, event.verify(verifier, false).unwrap())
.push(author, event.verify(author, verifier, false).unwrap())
.ok();
}
}
Expand Down
113 changes: 99 additions & 14 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::block_storage::tracing::{observe_block, BlockStage};
use crate::quorum_store::types::{Batch, Fragment};
use crate::{
counters,
logging::LogEvent,
Expand All @@ -14,6 +15,7 @@ use aptos_consensus_types::{
block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse, MAX_BLOCKS_PER_REQUEST},
common::Author,
experimental::{commit_decision::CommitDecision, commit_vote::CommitVote},
proof_of_store::{ProofOfStore, SignedDigest},
proposal_msg::ProposalMsg,
sync_info::SyncInfo,
vote_msg::VoteMsg,
Expand Down Expand Up @@ -55,10 +57,25 @@ pub struct NetworkReceivers {
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
pub quorum_store_messages: aptos_channel::Receiver<
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
pub block_retrieval:
aptos_channel::Receiver<AccountAddress, (AccountAddress, IncomingBlockRetrievalRequest)>,
}

#[async_trait::async_trait]
pub(crate) trait QuorumStoreSender {
async fn send_batch(&self, batch: Batch, recipients: Vec<Author>);

async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec<Author>);

async fn broadcast_fragment(&mut self, fragment: Fragment);

async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore);
}

/// Implements the actual networking support for all consensus messaging.
#[derive(Clone)]
pub struct NetworkSender {
Expand Down Expand Up @@ -142,6 +159,13 @@ impl NetworkSender {
error!("Error broadcasting to self: {:?}", err);
}

self.broadcast_without_self(msg).await;
}

/// Tries to send the given msg to all the participants, excluding self.
async fn broadcast_without_self(&mut self, msg: ConsensusMsg) {
fail_point!("consensus::send::any", |_| ());

// Get the list of validators excluding our own account address. Note the
// ordering is not important in this case.
let self_author = self.author;
Expand Down Expand Up @@ -271,11 +295,42 @@ impl NetworkSender {
}
}

#[async_trait::async_trait]
impl QuorumStoreSender for NetworkSender {
async fn send_batch(&self, batch: Batch, recipients: Vec<Author>) {
fail_point!("consensus::send_batch", |_| ());
let msg = ConsensusMsg::BatchMsg(Box::new(batch));
self.send(msg, recipients).await
}

async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec<Author>) {
fail_point!("consensus::send_signed_digest", |_| ());
let msg = ConsensusMsg::SignedDigestMsg(Box::new(signed_digest));
self.send(msg, recipients).await
}

async fn broadcast_fragment(&mut self, fragment: Fragment) {
fail_point!("consensus::send::broadcast_fragment", |_| ());
let msg = ConsensusMsg::FragmentMsg(Box::new(fragment));
self.broadcast_without_self(msg).await
}

async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore) {
fail_point!("consensus::send::proof_of_store", |_| ());
let msg = ConsensusMsg::ProofOfStoreMsg(Box::new(proof_of_store));
self.broadcast_without_self(msg).await
}
}

pub struct NetworkTask {
consensus_messages_tx: aptos_channel::Sender<
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
quorum_store_messages_tx: aptos_channel::Sender<
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
block_retrieval_tx:
aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingBlockRetrievalRequest)>,
all_events: Box<dyn Stream<Item = Event<ConsensusMsg>> + Send + Unpin>,
Expand All @@ -289,6 +344,11 @@ impl NetworkTask {
) -> (NetworkTask, NetworkReceivers) {
let (consensus_messages_tx, consensus_messages) =
aptos_channel::new(QueueStyle::LIFO, 1, Some(&counters::CONSENSUS_CHANNEL_MSGS));
let (quorum_store_messages_tx, quorum_store_messages) = aptos_channel::new(
QueueStyle::FIFO,
1000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a lot of messages, why do we need to buffer so many?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really shouldn't need to buffer this many, especially if back pressure is working well. However, it looks like we'll hve to redo back pressure, so I'm inclined to just keep this for now and revisit it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may consume significant amount of memory if not dequeue fast enough, imagine a fragment message with maximum message size (~64MB) * 1000

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's probably risky even with a single bad validator and quorum store off. Reduced to 50 for now.

Some(&counters::QUORUM_STORE_CHANNEL_MSGS),
);
let (block_retrieval_tx, block_retrieval) = aptos_channel::new(
QueueStyle::LIFO,
1,
Expand All @@ -298,37 +358,62 @@ impl NetworkTask {
(
NetworkTask {
consensus_messages_tx,
quorum_store_messages_tx,
block_retrieval_tx,
all_events,
},
NetworkReceivers {
consensus_messages,
quorum_store_messages,
block_retrieval,
},
)
}

fn push_msg(
peer_id: AccountAddress,
msg: ConsensusMsg,
tx: &aptos_channel::Sender<
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
) {
if let Err(e) = tx.push((peer_id, discriminant(&msg)), (peer_id, msg)) {
warn!(
remote_peer = peer_id,
error = ?e, "Error pushing consensus msg",
);
}
}

pub async fn start(mut self) {
while let Some(message) = self.all_events.next().await {
match message {
Event::Message(peer_id, msg) => {
counters::CONSENSUS_RECEIVED_MSGS
.with_label_values(&[msg.name()])
.inc();
if let ConsensusMsg::ProposalMsg(proposal) = &msg {
observe_block(
proposal.proposal().timestamp_usecs(),
BlockStage::NETWORK_RECEIVED,
);
}
if let Err(e) = self
.consensus_messages_tx
.push((peer_id, discriminant(&msg)), (peer_id, msg))
{
warn!(
remote_peer = peer_id,
error = ?e, "Error pushing consensus msg",
);
match msg {
quorum_store_msg @ (ConsensusMsg::SignedDigestMsg(_)
| ConsensusMsg::FragmentMsg(_)
| ConsensusMsg::BatchRequestMsg(_)
| ConsensusMsg::BatchMsg(_)
| ConsensusMsg::ProofOfStoreMsg(_)) => {
Self::push_msg(
peer_id,
quorum_store_msg,
&self.quorum_store_messages_tx,
);
}
consensus_msg => {
if let ConsensusMsg::ProposalMsg(proposal) = &consensus_msg {
observe_block(
proposal.proposal().timestamp_usecs(),
BlockStage::NETWORK_RECEIVED,
);
}
Self::push_msg(peer_id, consensus_msg, &self.consensus_messages_tx);
}
}
}
Event::RpcRequest(peer_id, msg, protocol, callback) => match msg {
Expand Down
Loading