Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Quorum Store] networking integration #5779

Merged
merged 14 commits into from
Dec 18, 2022
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ rand = { workspace = true }
safety-rules = { workspace = true }
schemadb = { workspace = true }
serde = { workspace = true }
serde_bytes = { workspace = true }
serde_json = { workspace = true }
short-hex-str = { workspace = true }
storage-interface = { workspace = true }
Expand Down
56 changes: 53 additions & 3 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

use crate::common::Round;
use anyhow::Context;
use aptos_crypto::HashValue;
use aptos_crypto::{bls12381, CryptoMaterialError, HashValue};
use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher};
use aptos_types::aggregate_signature::AggregateSignature;
use aptos_types::validator_signer::ValidatorSigner;
use aptos_types::validator_verifier::ValidatorVerifier;
use aptos_types::PeerId;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub struct LogicalTime {
Expand Down Expand Up @@ -39,14 +42,61 @@ pub struct SignedDigestInfo {
pub num_bytes: u64,
}

impl SignedDigestInfo {
pub fn new(digest: HashValue, expiration: LogicalTime, num_txns: u64, num_bytes: u64) -> Self {
Self {
digest,
expiration,
num_txns,
num_bytes,
}
}
}

// TODO: implement properly (and proper place) w.o. public fields.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sasha8 @gelash would you consider this resolved?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. Fields in SignedDigestInfo and SignedDigest are public. @gelash ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's okay, but I guess we wanted to revisit whether the fields should be pub or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks very weird especially one field is private and has getter and others are public, we should be consistent

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SignedDigest {
epoch: u64,
pub peer_id: PeerId,
pub info: SignedDigestInfo,
pub signature: bls12381::Signature,
}

impl SignedDigest {
pub fn new(
epoch: u64,
digest: HashValue,
expiration: LogicalTime,
num_txns: u64,
num_bytes: u64,
validator_signer: Arc<ValidatorSigner>,
) -> Result<Self, CryptoMaterialError> {
let info = SignedDigestInfo::new(digest, expiration, num_txns, num_bytes);
let signature = validator_signer.sign(&info)?;

Ok(Self {
epoch,
peer_id: validator_signer.author(),
info,
signature,
})
}

pub fn epoch(&self) -> u64 {
self.epoch
}

pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
Ok(validator.verify(self.peer_id, &self.info, &self.signature)?)
}
}

#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq)]
#[allow(dead_code)]
pub struct ProofOfStore {
info: SignedDigestInfo,
multi_signature: AggregateSignature,
}

#[allow(dead_code)]
impl ProofOfStore {
pub fn new(info: SignedDigestInfo, multi_signature: AggregateSignature) -> Self {
Self {
Expand Down
10 changes: 10 additions & 0 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,16 @@ pub static ROUND_MANAGER_CHANNEL_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Counters(queued,dequeued,dropped) related to quorum store channel
pub static QUORUM_STORE_CHANNEL_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_quorum_store_channel_msgs_count",
"Counters(queued,dequeued,dropped) related to quorum store channel",
&["state"]
)
.unwrap()
});

/// Counters(queued,dequeued,dropped) related to block retrieval channel
pub static BLOCK_RETRIEVAL_CHANNEL_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand Down
41 changes: 37 additions & 4 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,12 +756,18 @@ impl EpochManager {
let maybe_unverified_event = self.check_epoch(peer_id, consensus_msg).await?;

if let Some(unverified_event) = maybe_unverified_event {
// quorum store messages are only expected in epochs where it is enabled
self.check_quorum_store_enabled(peer_id, &unverified_event)
.await?;

// same epoch -> run well-formedness + signature check
let verified_event = monitor!(
"verify_message",
unverified_event
.clone()
.verify(&self.epoch_state().verifier, self.quorum_store_enabled)
unverified_event.clone().verify(
peer_id,
&self.epoch_state().verifier,
self.quorum_store_enabled
)
)
.context("[EpochManager] Verify event")
.map_err(|err| {
Expand Down Expand Up @@ -790,7 +796,11 @@ impl EpochManager {
| ConsensusMsg::SyncInfo(_)
| ConsensusMsg::VoteMsg(_)
| ConsensusMsg::CommitVoteMsg(_)
| ConsensusMsg::CommitDecisionMsg(_) => {
| ConsensusMsg::CommitDecisionMsg(_)
| ConsensusMsg::FragmentMsg(_)
| ConsensusMsg::BatchMsg(_)
| ConsensusMsg::SignedDigestMsg(_)
| ConsensusMsg::ProofOfStoreMsg(_) => {
let event: UnverifiedEvent = msg.into();
if event.epoch() == self.epoch() {
return Ok(Some(event));
Expand Down Expand Up @@ -836,6 +846,29 @@ impl EpochManager {
Ok(None)
}

async fn check_quorum_store_enabled(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is confusing. Maybe check_unverified_event_type?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not an async function, also isn't this redundant since we already pass self.quorum_store_enabled to verify function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally did the check in the verify function, but changed it to do the separate check based on Sasha's feedback (#5779 (comment)). I think this is a bit clearer; it avoids dirtying the verify function which should really verify the correctness of the message.

&mut self,
peer_id: AccountAddress,
event: &UnverifiedEvent,
) -> anyhow::Result<()> {
match event {
UnverifiedEvent::FragmentMsg(_)
| UnverifiedEvent::BatchMsg(_)
| UnverifiedEvent::SignedDigestMsg(_)
| UnverifiedEvent::ProofOfStoreMsg(_) => {
if self.quorum_store_enabled {
Ok(())
} else {
Err(anyhow::anyhow!(
"Quorum store is not enabled locally, but received msg from sender: {}",
peer_id,
))
}
}
_ => Ok(()),
}
}

fn process_event(
&mut self,
peer_id: AccountAddress,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/experimental/tests/buffer_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ async fn loopback_commit_vote(
let event: UnverifiedEvent = msg.into();
// verify the message and send the message into self loop
msg_tx
.push(author, event.verify(verifier, false).unwrap())
.push(author, event.verify(author, verifier, false).unwrap())
.ok();
}
}
Expand Down
108 changes: 94 additions & 14 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::block_storage::tracing::{observe_block, BlockStage};
use crate::quorum_store::types::{Batch, Fragment};
use crate::{
counters,
logging::LogEvent,
Expand All @@ -13,6 +14,7 @@ use aptos_consensus_types::{
block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse, MAX_BLOCKS_PER_REQUEST},
common::Author,
experimental::{commit_decision::CommitDecision, commit_vote::CommitVote},
proof_of_store::{ProofOfStore, SignedDigest},
proposal_msg::ProposalMsg,
sync_info::SyncInfo,
vote_msg::VoteMsg,
Expand Down Expand Up @@ -55,10 +57,25 @@ pub struct NetworkReceivers {
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
pub quorum_store_messages: aptos_channel::Receiver<
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
pub block_retrieval:
aptos_channel::Receiver<AccountAddress, (AccountAddress, IncomingBlockRetrievalRequest)>,
}

#[async_trait::async_trait]
pub(crate) trait QuorumStoreSender {
async fn send_batch(&self, batch: Batch, recipients: Vec<Author>);

async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec<Author>);

async fn broadcast_fragment(&mut self, fragment: Fragment);

async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore);
}

/// Implements the actual networking support for all consensus messaging.
#[derive(Clone)]
pub struct NetworkSender {
Expand Down Expand Up @@ -142,6 +159,13 @@ impl NetworkSender {
error!("Error broadcasting to self: {:?}", err);
}

self.broadcast_without_self(msg).await;
}

/// Tries to send the given msg to all the participants, excluding self.
async fn broadcast_without_self(&mut self, msg: ConsensusMsg) {
fail_point!("consensus::send::any", |_| ());

// Get the list of validators excluding our own account address. Note the
// ordering is not important in this case.
let self_author = self.author;
Expand Down Expand Up @@ -271,11 +295,42 @@ impl NetworkSender {
}
}

#[async_trait::async_trait]
impl QuorumStoreSender for NetworkSender {
async fn send_batch(&self, batch: Batch, recipients: Vec<Author>) {
fail_point!("consensus::send_batch", |_| ());
let msg = ConsensusMsg::BatchMsg(Box::new(batch));
self.send(msg, recipients).await
}

async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec<Author>) {
fail_point!("consensus::send_signed_digest", |_| ());
let msg = ConsensusMsg::SignedDigestMsg(Box::new(signed_digest));
self.send(msg, recipients).await
}

async fn broadcast_fragment(&mut self, fragment: Fragment) {
fail_point!("consensus::send::broadcast_fragment", |_| ());
let msg = ConsensusMsg::FragmentMsg(Box::new(fragment));
self.broadcast_without_self(msg).await
}

async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore) {
fail_point!("consensus::send::proof_of_store", |_| ());
let msg = ConsensusMsg::ProofOfStoreMsg(Box::new(proof_of_store));
self.broadcast_without_self(msg).await
}
}

pub struct NetworkTask {
consensus_messages_tx: aptos_channel::Sender<
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
quorum_store_messages_tx: aptos_channel::Sender<
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
block_retrieval_tx:
aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingBlockRetrievalRequest)>,
all_events: Box<dyn Stream<Item = Event<ConsensusMsg>> + Send + Unpin>,
Expand All @@ -289,6 +344,11 @@ impl NetworkTask {
) -> (NetworkTask, NetworkReceivers) {
let (consensus_messages_tx, consensus_messages) =
aptos_channel::new(QueueStyle::LIFO, 1, Some(&counters::CONSENSUS_CHANNEL_MSGS));
let (quorum_store_messages_tx, quorum_store_messages) = aptos_channel::new(
QueueStyle::FIFO,
1000,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a lot of messages, why do we need to buffer so many?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really shouldn't need to buffer this many, especially if back pressure is working well. However, it looks like we'll hve to redo back pressure, so I'm inclined to just keep this for now and revisit it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may consume significant amount of memory if not dequeue fast enough, imagine a fragment message with maximum message size (~64MB) * 1000

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's probably risky even with a single bad validator and quorum store off. Reduced to 50 for now.

Some(&counters::QUORUM_STORE_CHANNEL_MSGS),
);
let (block_retrieval_tx, block_retrieval) = aptos_channel::new(
QueueStyle::LIFO,
1,
Expand All @@ -298,11 +358,13 @@ impl NetworkTask {
(
NetworkTask {
consensus_messages_tx,
quorum_store_messages_tx,
block_retrieval_tx,
all_events,
},
NetworkReceivers {
consensus_messages,
quorum_store_messages,
block_retrieval,
},
)
Expand All @@ -315,20 +377,38 @@ impl NetworkTask {
counters::CONSENSUS_RECEIVED_MSGS
.with_label_values(&[msg.name()])
.inc();
if let ConsensusMsg::ProposalMsg(proposal) = &msg {
observe_block(
proposal.proposal().timestamp_usecs(),
BlockStage::NETWORK_RECEIVED,
);
}
if let Err(e) = self
.consensus_messages_tx
.push((peer_id, discriminant(&msg)), (peer_id, msg))
{
warn!(
remote_peer = peer_id,
error = ?e, "Error pushing consensus msg",
);
match msg {
quorum_store_msg @ (ConsensusMsg::SignedDigestMsg(_)
| ConsensusMsg::FragmentMsg(_)
| ConsensusMsg::BatchMsg(_)
| ConsensusMsg::ProofOfStoreMsg(_)) => {
if let Err(e) = self.quorum_store_messages_tx.push(
(peer_id, discriminant(&quorum_store_msg)),
(peer_id, quorum_store_msg),
) {
warn!(
remote_peer = peer_id,
error = ?e, "Error pushing consensus quorum store msg",
);
}
}
consensus_msg => {
if let ConsensusMsg::ProposalMsg(proposal) = &consensus_msg {
observe_block(
proposal.proposal().timestamp_usecs(),
BlockStage::NETWORK_RECEIVED,
);
}
if let Err(e) = self.consensus_messages_tx.push(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this piece of code seems repetitive

(peer_id, discriminant(&consensus_msg)),
(peer_id, consensus_msg),
) {
warn!(
remote_peer = peer_id,
error = ?e, "Error pushing consensus msg",
);
}
}
}
}
Event::RpcRequest(peer_id, msg, protocol, callback) => match msg {
Expand Down
Loading