Skip to content

Commit

Permalink
[qs] unify message process to not differentiate local vs remote
Browse files Browse the repository at this point in the history
This commit unifies fragment and proof processing such that we don't differentiate
whether the message is for local or remote.
  • Loading branch information
zekun000 authored Mar 14, 2023
1 parent a6a4c74 commit 7250a5d
Show file tree
Hide file tree
Showing 26 changed files with 507 additions and 716 deletions.
75 changes: 71 additions & 4 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::common::Round;
use anyhow::Context;
use anyhow::{bail, Context};
use aptos_crypto::{bls12381, CryptoMaterialError, HashValue};
use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher};
use aptos_types::{
Expand All @@ -11,6 +11,10 @@ use aptos_types::{
};
use rand::{seq::SliceRandom, thread_rng};
use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering,
fmt::{Display, Formatter},
};

#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub struct LogicalTime {
Expand All @@ -32,11 +36,59 @@ impl LogicalTime {
}
}

#[derive(
Copy, Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash, CryptoHasher, BCSCryptoHash,
)]
pub struct BatchId {
pub id: u64,
/// A random number that is stored in the DB and updated only if the value does not exist in
/// the DB: (a) at the start of an epoch, or (b) the DB was wiped. When the nonce is updated,
/// id starts again at 0.
pub nonce: u64,
}

impl BatchId {
pub fn new(nonce: u64) -> Self {
Self { id: 0, nonce }
}

pub fn new_for_test(id: u64) -> Self {
Self { id, nonce: 0 }
}

pub fn increment(&mut self) {
self.id += 1;
}
}

impl PartialOrd<Self> for BatchId {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for BatchId {
fn cmp(&self, other: &Self) -> Ordering {
match self.id.cmp(&other.id) {
Ordering::Equal => {},
ordering => return ordering,
}
self.nonce.cmp(&other.nonce)
}
}

impl Display for BatchId {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "({}, {})", self.id, self.nonce)
}
}

#[derive(
Clone, Debug, Deserialize, Serialize, CryptoHasher, BCSCryptoHash, PartialEq, Eq, Hash,
)]
pub struct SignedDigestInfo {
pub batch_author: PeerId,
pub batch_id: BatchId,
pub digest: HashValue,
pub expiration: LogicalTime,
pub num_txns: u64,
Expand All @@ -46,13 +98,15 @@ pub struct SignedDigestInfo {
impl SignedDigestInfo {
pub fn new(
batch_author: PeerId,
batch_id: BatchId,
digest: HashValue,
expiration: LogicalTime,
num_txns: u64,
num_bytes: u64,
) -> Self {
Self {
batch_author,
batch_id,
digest,
expiration,
num_txns,
Expand All @@ -72,14 +126,22 @@ pub struct SignedDigest {
impl SignedDigest {
pub fn new(
batch_author: PeerId,
batch_id: BatchId,
epoch: u64,
digest: HashValue,
expiration: LogicalTime,
num_txns: u64,
num_bytes: u64,
validator_signer: &ValidatorSigner,
) -> Result<Self, CryptoMaterialError> {
let info = SignedDigestInfo::new(batch_author, digest, expiration, num_txns, num_bytes);
let info = SignedDigestInfo::new(
batch_author,
batch_id,
digest,
expiration,
num_txns,
num_bytes,
);
let signature = validator_signer.sign(&info)?;

Ok(Self {
Expand All @@ -98,8 +160,12 @@ impl SignedDigest {
self.epoch
}

pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
Ok(validator.verify(self.signer, &self.info, &self.signature)?)
pub fn verify(&self, sender: PeerId, validator: &ValidatorVerifier) -> anyhow::Result<()> {
if sender == self.signer {
Ok(validator.verify(self.signer, &self.info, &self.signature)?)
} else {
bail!("Sender {} mismatch signer {}", sender, self.signer);
}
}

pub fn info(&self) -> &SignedDigestInfo {
Expand All @@ -117,6 +183,7 @@ impl SignedDigest {

#[derive(Debug, PartialEq)]
pub enum SignedDigestError {
WrongAuthor,
WrongInfo,
DuplicatedSignature,
}
Expand Down
18 changes: 5 additions & 13 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,9 @@ impl NetworkSender {

/// Tries to send the given msg to all the participants.
///
/// The future is fulfilled as soon as the message put into the mpsc channel to network
/// internal(to provide back pressure), it does not indicate the message is delivered or sent
/// out. It does not give indication about when the message is delivered to the recipients,
/// as well as there is no indication about the network failures.
/// The future is fulfilled as soon as the message is put into the mpsc channel to network
/// internal (to provide back pressure), it does not indicate the message is delivered or sent
/// out.
async fn broadcast(&mut self, msg: ConsensusMsg) {
fail_point!("consensus::send::any", |_| ());
// Directly send the message to ourself without going through network.
Expand All @@ -186,13 +185,6 @@ impl NetworkSender {
error!("Error broadcasting to self: {:?}", err);
}

self.broadcast_without_self(msg).await;
}

/// Tries to send the given msg to all the participants, excluding self.
async fn broadcast_without_self(&mut self, msg: ConsensusMsg) {
fail_point!("consensus::send::any", |_| ());

// Get the list of validators excluding our own account address. Note the
// ordering is not important in this case.
let self_author = self.author;
Expand Down Expand Up @@ -362,13 +354,13 @@ impl QuorumStoreSender for NetworkSender {
async fn broadcast_fragment(&mut self, fragment: Fragment) {
fail_point!("consensus::send::broadcast_fragment", |_| ());
let msg = ConsensusMsg::FragmentMsg(Box::new(fragment));
self.broadcast_without_self(msg).await
self.broadcast(msg).await
}

async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore) {
fail_point!("consensus::send::proof_of_store", |_| ());
let msg = ConsensusMsg::ProofOfStoreMsg(Box::new(proof_of_store));
self.broadcast_without_self(msg).await
self.broadcast(msg).await
}
}

Expand Down
15 changes: 7 additions & 8 deletions consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

use crate::{
network::NetworkSender,
quorum_store::{batch_store::BatchStore, quorum_store_coordinator::CoordinatorCommand},
quorum_store::{
batch_store::{BatchReader, BatchStore},
quorum_store_coordinator::CoordinatorCommand,
},
};
use aptos_consensus_types::{
block::Block,
Expand All @@ -12,7 +15,6 @@ use aptos_consensus_types::{
};
use aptos_crypto::HashValue;
use aptos_executor_types::{Error::DataNotFound, *};
use aptos_infallible::Mutex;
use aptos_logger::prelude::*;
use aptos_types::transaction::SignedTransaction;
use futures::{channel::mpsc::Sender, SinkExt};
Expand All @@ -23,10 +25,7 @@ use tokio::sync::oneshot;
/// If QuorumStore is enabled, has to ask BatchReader for the transaction behind the proofs of availability in the payload.
pub enum PayloadManager {
DirectMempool,
InQuorumStore(
Arc<BatchStore<NetworkSender>>,
Mutex<Sender<CoordinatorCommand>>,
),
InQuorumStore(Arc<BatchStore<NetworkSender>>, Sender<CoordinatorCommand>),
}

impl PayloadManager {
Expand Down Expand Up @@ -60,6 +59,7 @@ impl PayloadManager {
match self {
PayloadManager::DirectMempool => {},
PayloadManager::InQuorumStore(batch_store, coordinator_tx) => {
// TODO: move this to somewhere in quorum store, so this can be a batch reader
batch_store.update_certified_round(logical_time).await;

let digests: Vec<HashValue> = payloads
Expand All @@ -73,9 +73,8 @@ impl PayloadManager {
.map(|proof| *proof.digest())
.collect();

let mut tx = coordinator_tx.lock().clone();
let mut tx = coordinator_tx.clone();

// TODO: don't even need to warn on fail?
if let Err(e) = tx
.send(CoordinatorCommand::CommitNotification(
logical_time,
Expand Down
17 changes: 6 additions & 11 deletions consensus/src/quorum_store/batch_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
#![allow(dead_code)]
#![allow(unused_variables)]

use crate::quorum_store::{
counters,
types::{BatchId, SerializedTransaction},
};
use crate::quorum_store::{counters, types::SerializedTransaction};
use aptos_consensus_types::proof_of_store::BatchId;
use aptos_crypto::{hash::DefaultHasher, HashValue};
use aptos_logger::{error, warn};
use aptos_types::transaction::SignedTransaction;
Expand Down Expand Up @@ -145,8 +143,8 @@ impl BatchAggregator {
if Self::is_new_batch(batch_id, self_batch_id) {
self.batch_state.is_some() || fragment_id > 0
} else {
assert!(
batch_id == self_batch_id,
assert_eq!(
batch_id, self_batch_id,
"Missed fragment called with an outdated fragment"
);
fragment_id > self.next_fragment_id()
Expand Down Expand Up @@ -207,11 +205,8 @@ impl BatchAggregator {
self.batch_state = Some(IncrementalBatchState::new(self.max_batch_bytes));
}

if self.batch_state.is_some() {
self.batch_state
.as_mut()
.unwrap()
.append_transactions(transactions)?
if let Some(batch_state) = &mut self.batch_state {
batch_state.append_transactions(transactions)?
}
Ok(())
}
Expand Down
Loading

0 comments on commit 7250a5d

Please sign in to comment.