Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[qs] another refactor #7045

Merged
merged 5 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 71 additions & 4 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::common::Round;
use anyhow::Context;
use anyhow::{bail, Context};
use aptos_crypto::{bls12381, CryptoMaterialError, HashValue};
use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher};
use aptos_types::{
Expand All @@ -11,6 +11,10 @@ use aptos_types::{
};
use rand::{seq::SliceRandom, thread_rng};
use serde::{Deserialize, Serialize};
use std::{
cmp::Ordering,
fmt::{Display, Formatter},
};

#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub struct LogicalTime {
Expand All @@ -32,11 +36,59 @@ impl LogicalTime {
}
}

#[derive(
Copy, Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash, CryptoHasher, BCSCryptoHash,
)]
pub struct BatchId {
pub id: u64,
/// A random number that is stored in the DB and updated only if the value does not exist in
/// the DB: (a) at the start of an epoch, or (b) the DB was wiped. When the nonce is updated,
/// id starts again at 0.
pub nonce: u64,
}

impl BatchId {
pub fn new(nonce: u64) -> Self {
Self { id: 0, nonce }
}

pub fn new_for_test(id: u64) -> Self {
Self { id, nonce: 0 }
}

pub fn increment(&mut self) {
self.id += 1;
}
}

impl PartialOrd<Self> for BatchId {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for BatchId {
fn cmp(&self, other: &Self) -> Ordering {
match self.id.cmp(&other.id) {
Ordering::Equal => {},
ordering => return ordering,
}
self.nonce.cmp(&other.nonce)
}
}

impl Display for BatchId {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "({}, {})", self.id, self.nonce)
}
}

#[derive(
Clone, Debug, Deserialize, Serialize, CryptoHasher, BCSCryptoHash, PartialEq, Eq, Hash,
)]
pub struct SignedDigestInfo {
pub batch_author: PeerId,
pub batch_id: BatchId,
pub digest: HashValue,
pub expiration: LogicalTime,
pub num_txns: u64,
Expand All @@ -46,13 +98,15 @@ pub struct SignedDigestInfo {
impl SignedDigestInfo {
pub fn new(
batch_author: PeerId,
batch_id: BatchId,
digest: HashValue,
expiration: LogicalTime,
num_txns: u64,
num_bytes: u64,
) -> Self {
Self {
batch_author,
batch_id,
digest,
expiration,
num_txns,
Expand All @@ -72,14 +126,22 @@ pub struct SignedDigest {
impl SignedDigest {
pub fn new(
batch_author: PeerId,
batch_id: BatchId,
epoch: u64,
digest: HashValue,
expiration: LogicalTime,
num_txns: u64,
num_bytes: u64,
validator_signer: &ValidatorSigner,
) -> Result<Self, CryptoMaterialError> {
let info = SignedDigestInfo::new(batch_author, digest, expiration, num_txns, num_bytes);
let info = SignedDigestInfo::new(
batch_author,
batch_id,
digest,
expiration,
num_txns,
num_bytes,
);
let signature = validator_signer.sign(&info)?;

Ok(Self {
Expand All @@ -98,8 +160,12 @@ impl SignedDigest {
self.epoch
}

pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> {
Ok(validator.verify(self.signer, &self.info, &self.signature)?)
pub fn verify(&self, sender: PeerId, validator: &ValidatorVerifier) -> anyhow::Result<()> {
if sender == self.signer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool!

Ok(validator.verify(self.signer, &self.info, &self.signature)?)
} else {
bail!("Sender {} mismatch signer {}", sender, self.signer);
}
}

pub fn info(&self) -> &SignedDigestInfo {
Expand All @@ -117,6 +183,7 @@ impl SignedDigest {

#[derive(Debug, PartialEq)]
pub enum SignedDigestError {
WrongAuthor,
WrongInfo,
DuplicatedSignature,
}
Expand Down
18 changes: 5 additions & 13 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,9 @@ impl NetworkSender {

/// Tries to send the given msg to all the participants.
///
/// The future is fulfilled as soon as the message put into the mpsc channel to network
/// internal(to provide back pressure), it does not indicate the message is delivered or sent
/// out. It does not give indication about when the message is delivered to the recipients,
/// as well as there is no indication about the network failures.
/// The future is fulfilled as soon as the message is put into the mpsc channel to network
/// internal (to provide back pressure), it does not indicate the message is delivered or sent
/// out.
async fn broadcast(&mut self, msg: ConsensusMsg) {
fail_point!("consensus::send::any", |_| ());
// Directly send the message to ourself without going through network.
Expand All @@ -186,13 +185,6 @@ impl NetworkSender {
error!("Error broadcasting to self: {:?}", err);
}

self.broadcast_without_self(msg).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you didn't touch it, but if you get a chance could you update the function comment, "internal(" needs space, "message put" probably message is put or sent, and in general can probably be expressed more concisely.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should delete this anyway

}

/// Tries to send the given msg to all the participants, excluding self.
async fn broadcast_without_self(&mut self, msg: ConsensusMsg) {
fail_point!("consensus::send::any", |_| ());

// Get the list of validators excluding our own account address. Note the
// ordering is not important in this case.
let self_author = self.author;
Expand Down Expand Up @@ -362,13 +354,13 @@ impl QuorumStoreSender for NetworkSender {
async fn broadcast_fragment(&mut self, fragment: Fragment) {
fail_point!("consensus::send::broadcast_fragment", |_| ());
let msg = ConsensusMsg::FragmentMsg(Box::new(fragment));
self.broadcast_without_self(msg).await
self.broadcast(msg).await
}

async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore) {
fail_point!("consensus::send::proof_of_store", |_| ());
let msg = ConsensusMsg::ProofOfStoreMsg(Box::new(proof_of_store));
self.broadcast_without_self(msg).await
self.broadcast(msg).await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can delete the broadcast_without_self method since this was the only use case.

}
}

Expand Down
15 changes: 7 additions & 8 deletions consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@

use crate::{
network::NetworkSender,
quorum_store::{batch_store::BatchStore, quorum_store_coordinator::CoordinatorCommand},
quorum_store::{
batch_store::{BatchReader, BatchStore},
quorum_store_coordinator::CoordinatorCommand,
},
};
use aptos_consensus_types::{
block::Block,
Expand All @@ -12,7 +15,6 @@ use aptos_consensus_types::{
};
use aptos_crypto::HashValue;
use aptos_executor_types::{Error::DataNotFound, *};
use aptos_infallible::Mutex;
use aptos_logger::prelude::*;
use aptos_types::transaction::SignedTransaction;
use futures::{channel::mpsc::Sender, SinkExt};
Expand All @@ -23,10 +25,7 @@ use tokio::sync::oneshot;
/// If QuorumStore is enabled, has to ask BatchReader for the transaction behind the proofs of availability in the payload.
pub enum PayloadManager {
DirectMempool,
InQuorumStore(
Arc<BatchStore<NetworkSender>>,
Mutex<Sender<CoordinatorCommand>>,
),
InQuorumStore(Arc<BatchStore<NetworkSender>>, Sender<CoordinatorCommand>),
}

impl PayloadManager {
Expand Down Expand Up @@ -60,6 +59,7 @@ impl PayloadManager {
match self {
PayloadManager::DirectMempool => {},
PayloadManager::InQuorumStore(batch_store, coordinator_tx) => {
// TODO: move this to somewhere in quorum store, so this can be a batch reader
batch_store.update_certified_round(logical_time).await;

let digests: Vec<HashValue> = payloads
Expand All @@ -73,9 +73,8 @@ impl PayloadManager {
.map(|proof| *proof.digest())
.collect();

let mut tx = coordinator_tx.lock().clone();
let mut tx = coordinator_tx.clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the TODO below deprecated?


// TODO: don't even need to warn on fail?
if let Err(e) = tx
.send(CoordinatorCommand::CommitNotification(
logical_time,
Expand Down
17 changes: 6 additions & 11 deletions consensus/src/quorum_store/batch_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
#![allow(dead_code)]
#![allow(unused_variables)]

use crate::quorum_store::{
counters,
types::{BatchId, SerializedTransaction},
};
use crate::quorum_store::{counters, types::SerializedTransaction};
use aptos_consensus_types::proof_of_store::BatchId;
use aptos_crypto::{hash::DefaultHasher, HashValue};
use aptos_logger::{error, warn};
use aptos_types::transaction::SignedTransaction;
Expand Down Expand Up @@ -145,8 +143,8 @@ impl BatchAggregator {
if Self::is_new_batch(batch_id, self_batch_id) {
self.batch_state.is_some() || fragment_id > 0
} else {
assert!(
batch_id == self_batch_id,
assert_eq!(
batch_id, self_batch_id,
"Missed fragment called with an outdated fragment"
);
fragment_id > self.next_fragment_id()
Expand Down Expand Up @@ -207,11 +205,8 @@ impl BatchAggregator {
self.batch_state = Some(IncrementalBatchState::new(self.max_batch_bytes));
}

if self.batch_state.is_some() {
self.batch_state
.as_mut()
.unwrap()
.append_transactions(transactions)?
if let Some(batch_state) = &mut self.batch_state {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugh, thanks for showing the correct way in the code we will delete shortly anyway ;-)

batch_state.append_transactions(transactions)?
}
Ok(())
}
Expand Down
Loading