diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index d4f2fa603dcd7..e295e5972d005 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::common::Round; -use anyhow::Context; +use anyhow::{bail, Context}; use aptos_crypto::{bls12381, CryptoMaterialError, HashValue}; use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher}; use aptos_types::{ @@ -11,6 +11,10 @@ use aptos_types::{ }; use rand::{seq::SliceRandom, thread_rng}; use serde::{Deserialize, Serialize}; +use std::{ + cmp::Ordering, + fmt::{Display, Formatter}, +}; #[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)] pub struct LogicalTime { @@ -32,11 +36,59 @@ impl LogicalTime { } } +#[derive( + Copy, Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash, CryptoHasher, BCSCryptoHash, +)] +pub struct BatchId { + pub id: u64, + /// A random number that is stored in the DB and updated only if the value does not exist in + /// the DB: (a) at the start of an epoch, or (b) the DB was wiped. When the nonce is updated, + /// id starts again at 0. + pub nonce: u64, +} + +impl BatchId { + pub fn new(nonce: u64) -> Self { + Self { id: 0, nonce } + } + + pub fn new_for_test(id: u64) -> Self { + Self { id, nonce: 0 } + } + + pub fn increment(&mut self) { + self.id += 1; + } +} + +impl PartialOrd for BatchId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for BatchId { + fn cmp(&self, other: &Self) -> Ordering { + match self.id.cmp(&other.id) { + Ordering::Equal => {}, + ordering => return ordering, + } + self.nonce.cmp(&other.nonce) + } +} + +impl Display for BatchId { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "({}, {})", self.id, self.nonce) + } +} + #[derive( Clone, Debug, Deserialize, Serialize, CryptoHasher, BCSCryptoHash, PartialEq, Eq, Hash, )] pub struct SignedDigestInfo { pub batch_author: PeerId, + pub batch_id: BatchId, pub digest: HashValue, pub expiration: LogicalTime, pub num_txns: u64, @@ -46,6 +98,7 @@ pub struct SignedDigestInfo { impl SignedDigestInfo { pub fn new( batch_author: PeerId, + batch_id: BatchId, digest: HashValue, expiration: LogicalTime, num_txns: u64, @@ -53,6 +106,7 @@ impl SignedDigestInfo { ) -> Self { Self { batch_author, + batch_id, digest, expiration, num_txns, @@ -72,6 +126,7 @@ pub struct SignedDigest { impl SignedDigest { pub fn new( batch_author: PeerId, + batch_id: BatchId, epoch: u64, digest: HashValue, expiration: LogicalTime, @@ -79,7 +134,14 @@ impl SignedDigest { num_bytes: u64, validator_signer: &ValidatorSigner, ) -> Result { - let info = SignedDigestInfo::new(batch_author, digest, expiration, num_txns, num_bytes); + let info = SignedDigestInfo::new( + batch_author, + batch_id, + digest, + expiration, + num_txns, + num_bytes, + ); let signature = validator_signer.sign(&info)?; Ok(Self { @@ -98,8 +160,12 @@ impl SignedDigest { self.epoch } - pub fn verify(&self, validator: &ValidatorVerifier) -> anyhow::Result<()> { - Ok(validator.verify(self.signer, &self.info, &self.signature)?) + pub fn verify(&self, sender: PeerId, validator: &ValidatorVerifier) -> anyhow::Result<()> { + if sender == self.signer { + Ok(validator.verify(self.signer, &self.info, &self.signature)?) + } else { + bail!("Sender {} mismatch signer {}", sender, self.signer); + } } pub fn info(&self) -> &SignedDigestInfo { @@ -117,6 +183,7 @@ impl SignedDigest { #[derive(Debug, PartialEq)] pub enum SignedDigestError { + WrongAuthor, WrongInfo, DuplicatedSignature, } diff --git a/consensus/src/network.rs b/consensus/src/network.rs index 6a5d0fc148a8c..5365877d08a6f 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -174,10 +174,9 @@ impl NetworkSender { /// Tries to send the given msg to all the participants. /// - /// The future is fulfilled as soon as the message put into the mpsc channel to network - /// internal(to provide back pressure), it does not indicate the message is delivered or sent - /// out. It does not give indication about when the message is delivered to the recipients, - /// as well as there is no indication about the network failures. + /// The future is fulfilled as soon as the message is put into the mpsc channel to network + /// internal (to provide back pressure), it does not indicate the message is delivered or sent + /// out. async fn broadcast(&mut self, msg: ConsensusMsg) { fail_point!("consensus::send::any", |_| ()); // Directly send the message to ourself without going through network. @@ -186,13 +185,6 @@ impl NetworkSender { error!("Error broadcasting to self: {:?}", err); } - self.broadcast_without_self(msg).await; - } - - /// Tries to send the given msg to all the participants, excluding self. - async fn broadcast_without_self(&mut self, msg: ConsensusMsg) { - fail_point!("consensus::send::any", |_| ()); - // Get the list of validators excluding our own account address. Note the // ordering is not important in this case. let self_author = self.author; @@ -362,13 +354,13 @@ impl QuorumStoreSender for NetworkSender { async fn broadcast_fragment(&mut self, fragment: Fragment) { fail_point!("consensus::send::broadcast_fragment", |_| ()); let msg = ConsensusMsg::FragmentMsg(Box::new(fragment)); - self.broadcast_without_self(msg).await + self.broadcast(msg).await } async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore) { fail_point!("consensus::send::proof_of_store", |_| ()); let msg = ConsensusMsg::ProofOfStoreMsg(Box::new(proof_of_store)); - self.broadcast_without_self(msg).await + self.broadcast(msg).await } } diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs index 9440b8df4807b..7f825d3db1cd6 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager.rs @@ -3,7 +3,10 @@ use crate::{ network::NetworkSender, - quorum_store::{batch_store::BatchStore, quorum_store_coordinator::CoordinatorCommand}, + quorum_store::{ + batch_store::{BatchReader, BatchStore}, + quorum_store_coordinator::CoordinatorCommand, + }, }; use aptos_consensus_types::{ block::Block, @@ -12,7 +15,6 @@ use aptos_consensus_types::{ }; use aptos_crypto::HashValue; use aptos_executor_types::{Error::DataNotFound, *}; -use aptos_infallible::Mutex; use aptos_logger::prelude::*; use aptos_types::transaction::SignedTransaction; use futures::{channel::mpsc::Sender, SinkExt}; @@ -23,10 +25,7 @@ use tokio::sync::oneshot; /// If QuorumStore is enabled, has to ask BatchReader for the transaction behind the proofs of availability in the payload. pub enum PayloadManager { DirectMempool, - InQuorumStore( - Arc>, - Mutex>, - ), + InQuorumStore(Arc>, Sender), } impl PayloadManager { @@ -60,6 +59,7 @@ impl PayloadManager { match self { PayloadManager::DirectMempool => {}, PayloadManager::InQuorumStore(batch_store, coordinator_tx) => { + // TODO: move this to somewhere in quorum store, so this can be a batch reader batch_store.update_certified_round(logical_time).await; let digests: Vec = payloads @@ -73,9 +73,8 @@ impl PayloadManager { .map(|proof| *proof.digest()) .collect(); - let mut tx = coordinator_tx.lock().clone(); + let mut tx = coordinator_tx.clone(); - // TODO: don't even need to warn on fail? if let Err(e) = tx .send(CoordinatorCommand::CommitNotification( logical_time, diff --git a/consensus/src/quorum_store/batch_aggregator.rs b/consensus/src/quorum_store/batch_aggregator.rs index 6eb7fb0ff0cc1..8cb244e5c0ef5 100644 --- a/consensus/src/quorum_store/batch_aggregator.rs +++ b/consensus/src/quorum_store/batch_aggregator.rs @@ -4,10 +4,8 @@ #![allow(dead_code)] #![allow(unused_variables)] -use crate::quorum_store::{ - counters, - types::{BatchId, SerializedTransaction}, -}; +use crate::quorum_store::{counters, types::SerializedTransaction}; +use aptos_consensus_types::proof_of_store::BatchId; use aptos_crypto::{hash::DefaultHasher, HashValue}; use aptos_logger::{error, warn}; use aptos_types::transaction::SignedTransaction; @@ -145,8 +143,8 @@ impl BatchAggregator { if Self::is_new_batch(batch_id, self_batch_id) { self.batch_state.is_some() || fragment_id > 0 } else { - assert!( - batch_id == self_batch_id, + assert_eq!( + batch_id, self_batch_id, "Missed fragment called with an outdated fragment" ); fragment_id > self.next_fragment_id() @@ -207,11 +205,8 @@ impl BatchAggregator { self.batch_state = Some(IncrementalBatchState::new(self.max_batch_bytes)); } - if self.batch_state.is_some() { - self.batch_state - .as_mut() - .unwrap() - .append_transactions(transactions)? + if let Some(batch_state) = &mut self.batch_state { + batch_state.append_transactions(transactions)? } Ok(()) } diff --git a/consensus/src/quorum_store/batch_coordinator.rs b/consensus/src/quorum_store/batch_coordinator.rs index bcfa34cc400e7..5df89064b6eec 100644 --- a/consensus/src/quorum_store/batch_coordinator.rs +++ b/consensus/src/quorum_store/batch_coordinator.rs @@ -7,43 +7,27 @@ use crate::{ batch_aggregator::BatchAggregator, batch_store::{BatchStore, PersistRequest}, counters, - proof_coordinator::{ProofCoordinatorCommand, ProofReturnChannel}, - types::{BatchId, Fragment, SerializedTransaction}, + types::Fragment, }, }; -use aptos_consensus_types::proof_of_store::{LogicalTime, SignedDigestInfo}; use aptos_logger::prelude::*; use aptos_types::PeerId; use std::{collections::HashMap, sync::Arc}; -use tokio::sync::{ - mpsc::{Receiver, Sender}, - oneshot, -}; +use tokio::sync::{mpsc::Receiver, oneshot}; #[derive(Debug)] pub enum BatchCoordinatorCommand { - AppendToBatch(Vec, BatchId), - EndBatch( - Vec, - BatchId, - LogicalTime, - ProofReturnChannel, - ), Shutdown(oneshot::Sender<()>), - RemoteFragment(Box), + AppendFragment(Box), } pub struct BatchCoordinator { epoch: u64, my_peer_id: PeerId, network_sender: NetworkSender, - command_rx: Receiver, batch_store: Arc>, - proof_coordinator_tx: Sender, max_batch_bytes: usize, - remote_batch_aggregators: HashMap, - local_batch_aggregator: BatchAggregator, - local_fragment_id: usize, + batch_aggregators: HashMap, } impl BatchCoordinator { @@ -51,113 +35,23 @@ impl BatchCoordinator { epoch: u64, //TODO: pass the epoch config my_peer_id: PeerId, network_sender: NetworkSender, - wrapper_command_rx: Receiver, batch_store: Arc>, - proof_coordinator_tx: Sender, max_batch_bytes: usize, ) -> Self { Self { epoch, my_peer_id, network_sender, - command_rx: wrapper_command_rx, batch_store, - proof_coordinator_tx, max_batch_bytes, - remote_batch_aggregators: HashMap::new(), - local_batch_aggregator: BatchAggregator::new(max_batch_bytes), - local_fragment_id: 0, - } - } - - /// Aggregate & compute rolling digest, synchronously by worker. - fn handle_append_to_batch( - &mut self, - fragment_payload: Vec, - batch_id: BatchId, - ) -> Fragment { - match self.local_batch_aggregator.append_transactions( - batch_id, - self.local_fragment_id, - fragment_payload.clone(), - ) { - Ok(()) => Fragment::new( - self.epoch, - batch_id, - self.local_fragment_id, - fragment_payload, - None, - self.my_peer_id, - ), - Err(e) => { - unreachable!( - "[QuorumStore] Aggregation failed for own fragments with error {:?}", - e - ); - }, - } - } - - /// Finalize the batch & digest, synchronously by worker. - async fn handle_end_batch( - &mut self, - fragment_payload: Vec, - batch_id: BatchId, - expiration: LogicalTime, - proof_tx: ProofReturnChannel, - ) -> (PersistRequest, Fragment) { - match self.local_batch_aggregator.end_batch( - batch_id, - self.local_fragment_id, - fragment_payload.clone(), - ) { - Ok((num_bytes, payload, digest)) => { - let fragment = Fragment::new( - self.epoch, - batch_id, - self.local_fragment_id, - fragment_payload, - Some(expiration), - self.my_peer_id, - ); - - self.proof_coordinator_tx - .send(ProofCoordinatorCommand::InitProof( - SignedDigestInfo::new( - self.my_peer_id, - digest, - expiration, - payload.len() as u64, - num_bytes as u64, - ), - fragment.batch_id(), - proof_tx, - )) - .await - .expect("Failed to send to ProofBuilder"); - - let persist_request = PersistRequest::new( - self.my_peer_id, - payload.clone(), - digest, - num_bytes, - expiration, - ); - (persist_request, fragment) - }, - Err(e) => { - unreachable!( - "[QuorumStore] Aggregation failed for own fragments with error {:?}", - e - ); - }, + batch_aggregators: HashMap::new(), } } async fn handle_fragment(&mut self, fragment: Fragment) -> Option { let source = fragment.source(); let entry = self - .remote_batch_aggregators + .batch_aggregators .entry(source) .or_insert_with(|| BatchAggregator::new(self.max_batch_bytes)); if let Some(expiration) = fragment.maybe_expiration() { @@ -169,6 +63,7 @@ impl BatchCoordinator { fragment.batch_id(), fragment.fragment_id(), ); + let batch_id = fragment.batch_id(); if expiration.epoch() == self.epoch { match entry.end_batch( fragment.batch_id(), @@ -176,8 +71,9 @@ impl BatchCoordinator { fragment.into_transactions(), ) { Ok((num_bytes, payload, digest)) => { - let persist_request = - PersistRequest::new(source, payload, digest, num_bytes, expiration); + let persist_request = PersistRequest::new( + source, batch_id, payload, digest, num_bytes, expiration, + ); return Some(persist_request); }, Err(e) => { @@ -220,8 +116,8 @@ impl BatchCoordinator { }); } - pub(crate) async fn start(mut self) { - while let Some(command) = self.command_rx.recv().await { + pub(crate) async fn start(mut self, mut command_rx: Receiver) { + while let Some(command) = command_rx.recv().await { match command { BatchCoordinatorCommand::Shutdown(ack_tx) => { ack_tx @@ -229,32 +125,7 @@ impl BatchCoordinator { .expect("Failed to send shutdown ack to QuorumStoreCoordinator"); break; }, - BatchCoordinatorCommand::AppendToBatch(fragment_payload, batch_id) => { - trace!("QS: append to batch cmd received, batch id {}", batch_id); - let msg = self.handle_append_to_batch(fragment_payload, batch_id); - self.network_sender.broadcast_fragment(msg).await; - - self.local_fragment_id += 1; - }, - BatchCoordinatorCommand::EndBatch( - fragment_payload, - batch_id, - logical_time, - proof_tx, - ) => { - debug!("QS: end batch cmd received, batch id = {}", batch_id); - let (persist_request, fragment) = self - .handle_end_batch(fragment_payload, batch_id, logical_time, proof_tx) - .await; - - self.network_sender.broadcast_fragment(fragment).await; - self.persist_and_send_digest(persist_request); - - counters::NUM_FRAGMENT_PER_BATCH.observe((self.local_fragment_id + 1) as f64); - - self.local_fragment_id = 0; - }, - BatchCoordinatorCommand::RemoteFragment(fragment) => { + BatchCoordinatorCommand::AppendFragment(fragment) => { if let Some(persist_request) = self.handle_fragment(*fragment).await { self.persist_and_send_digest(persist_request); } diff --git a/consensus/src/quorum_store/batch_generator.rs b/consensus/src/quorum_store/batch_generator.rs index b205942fa87db..ac2ac4c9f2955 100644 --- a/consensus/src/quorum_store/batch_generator.rs +++ b/consensus/src/quorum_store/batch_generator.rs @@ -2,44 +2,38 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ monitor, + network::{NetworkSender, QuorumStoreSender}, quorum_store::{ - batch_coordinator::BatchCoordinatorCommand, counters, quorum_store_db::QuorumStoreStorage, - types::BatchId, + types::Fragment, utils::{BatchBuilder, MempoolProxy, RoundExpirations}, }, }; use aptos_config::config::QuorumStoreConfig; use aptos_consensus_types::{ common::TransactionSummary, - proof_of_store::{LogicalTime, ProofOfStore}, + proof_of_store::{BatchId, LogicalTime}, }; use aptos_logger::prelude::*; use aptos_mempool::QuorumStoreRequest; -use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; -use futures_channel::{mpsc::Sender, oneshot}; +use aptos_types::PeerId; +use futures_channel::mpsc::Sender; use rand::{thread_rng, RngCore}; use std::{ collections::HashMap, sync::Arc, time::{Duration, Instant}, }; -use tokio::{sync::mpsc::Sender as TokioSender, time::Interval}; - -type ProofCompletedChannel = oneshot::Receiver>; +use tokio::time::Interval; #[derive(Debug)] pub enum BatchGeneratorCommand { CommitNotification(LogicalTime), + ProofExpiration(Vec), Shutdown(tokio::sync::oneshot::Sender<()>), } -#[derive(Debug, PartialEq, Eq)] -pub enum ProofError { - Timeout(BatchId), -} - #[derive(Copy, Clone, PartialEq, Eq)] pub struct BackPressure { pub txn_count: bool, @@ -47,12 +41,13 @@ pub struct BackPressure { } pub struct BatchGenerator { + epoch: u64, + my_peer_id: PeerId, db: Arc, config: QuorumStoreConfig, mempool_proxy: MempoolProxy, - batch_coordinator_tx: TokioSender, batches_in_progress: HashMap>, - batch_expirations: RoundExpirations, + batch_round_expirations: RoundExpirations, batch_builder: BatchBuilder, latest_logical_time: LogicalTime, last_end_batch_time: Instant, @@ -63,10 +58,10 @@ pub struct BatchGenerator { impl BatchGenerator { pub(crate) fn new( epoch: u64, + my_peer_id: PeerId, config: QuorumStoreConfig, db: Arc, mempool_tx: Sender, - batch_coordinator_tx: TokioSender, mempool_txn_pull_timeout_ms: u64, ) -> Self { let batch_id = if let Some(mut id) = db @@ -87,12 +82,13 @@ impl BatchGenerator { let max_batch_bytes = config.max_batch_bytes; Self { + epoch, + my_peer_id, db, config, mempool_proxy: MempoolProxy::new(mempool_tx, mempool_txn_pull_timeout_ms), - batch_coordinator_tx, batches_in_progress: HashMap::new(), - batch_expirations: RoundExpirations::new(), + batch_round_expirations: RoundExpirations::new(), batch_builder: BatchBuilder::new(batch_id, max_batch_bytes), latest_logical_time: LogicalTime::new(epoch, 0), last_end_batch_time: Instant::now(), @@ -103,10 +99,7 @@ impl BatchGenerator { } } - pub(crate) async fn handle_scheduled_pull( - &mut self, - max_count: u64, - ) -> Option { + pub(crate) async fn handle_scheduled_pull(&mut self, max_count: u64) -> Option { // TODO: as an optimization, we could filter out the txns that have expired let mut exclude_txns: Vec<_> = self @@ -119,7 +112,6 @@ impl BatchGenerator { trace!("QS: excluding txs len: {:?}", exclude_txns.len()); let mut end_batch = false; - // TODO: size and unwrap or not? let pulled_txns = self .mempool_proxy .pull_internal( @@ -131,7 +123,7 @@ impl BatchGenerator { exclude_txns, ) .await - .unwrap(); + .unwrap_or_default(); trace!("QS: pulled_txns len: {:?}", pulled_txns.len()); if pulled_txns.is_empty() { @@ -141,34 +133,29 @@ impl BatchGenerator { counters::PULLED_TXNS_NUM.observe(pulled_txns.len() as f64); } - for txn in pulled_txns { - if !self - .batch_builder - .append_transaction(&txn, max_count as usize) - { - end_batch = true; - break; - } + if !self + .batch_builder + .append_transactions(&pulled_txns, max_count as usize) + || self.last_end_batch_time.elapsed().as_millis() > self.config.end_batch_ms as u128 + { + end_batch = true; } let serialized_txns = self.batch_builder.take_serialized_txns(); - if self.last_end_batch_time.elapsed().as_millis() > self.config.end_batch_ms as u128 { - end_batch = true; - } - let batch_id = self.batch_builder.batch_id(); if !end_batch { if !serialized_txns.is_empty() { - self.batch_coordinator_tx - .send(BatchCoordinatorCommand::AppendToBatch( - serialized_txns, - batch_id, - )) - .await - .expect("could not send to QuorumStore"); + let fragment = Fragment::new( + self.epoch, + batch_id, + self.batch_builder.fetch_and_increment_fragment_id(), + serialized_txns, + None, + self.my_peer_id, + ); + return Some(fragment); } - None } else { if self.batch_builder.is_empty() { // Quorum store metrics @@ -179,7 +166,6 @@ impl BatchGenerator { .observe_duration(Duration::from_secs_f64(duration)); self.last_end_batch_time = Instant::now(); - return None; } @@ -197,67 +183,38 @@ impl BatchGenerator { .save_batch_id(self.latest_logical_time.epoch(), incremented_batch_id) .expect("Could not save to db"); - let (proof_tx, proof_rx) = oneshot::channel(); let expiry_round = self.latest_logical_time.round() + self.config.batch_expiry_round_gap_when_init; let logical_time = LogicalTime::new(self.latest_logical_time.epoch(), expiry_round); - self.batch_coordinator_tx - .send(BatchCoordinatorCommand::EndBatch( - serialized_txns, - batch_id, - logical_time, - proof_tx, - )) - .await - .expect("could not send to QuorumStore"); + let fragment = Fragment::new( + self.epoch, + batch_id, + self.batch_builder.fetch_and_increment_fragment_id(), + serialized_txns, + Some(logical_time), + self.my_peer_id, + ); self.batches_in_progress .insert(batch_id, self.batch_builder.take_summaries()); - self.batch_expirations.add_item(batch_id, expiry_round); + self.batch_round_expirations + .add_item(batch_id, expiry_round); self.last_end_batch_time = Instant::now(); - - Some(proof_rx) - } - } - - pub(crate) async fn handle_completed_proof( - &mut self, - msg: Result<(ProofOfStore, BatchId), ProofError>, - ) { - match msg { - Ok((proof, batch_id)) => { - trace!( - "QS: received proof of store for batch id {}, digest {}", - batch_id, - proof.digest(), - ); - - counters::LOCAL_POS_COUNT.inc(); - }, - Err(ProofError::Timeout(batch_id)) => { - // Quorum store measurements - counters::TIMEOUT_BATCHES_COUNT.inc(); - - debug!( - "QS: received timeout for proof of store, batch id = {}", - batch_id - ); - // Not able to gather the proof, allow transactions to be polled again. - self.batches_in_progress.remove(&batch_id); - }, + return Some(fragment); } + None } pub async fn start( mut self, + mut network_sender: NetworkSender, mut cmd_rx: tokio::sync::mpsc::Receiver, mut back_pressure_rx: tokio::sync::mpsc::Receiver, mut interval: Interval, ) { let start = Instant::now(); - let mut proofs_in_progress: FuturesUnordered> = FuturesUnordered::new(); let mut last_non_empty_pull = start; let back_pressure_decrease_duration = @@ -279,6 +236,7 @@ impl BatchGenerator { self.back_pressure = updated_back_pressure; }, _ = interval.tick() => monitor!("batch_generator_handle_tick", { + let now = Instant::now(); // TODO: refactor back_pressure logic into its own function if self.back_pressure.txn_count { @@ -321,17 +279,9 @@ impl BatchGenerator { let dynamic_pull_max_txn = std::cmp::max( (since_last_non_empty_pull_ms as f64 / 1000.0 * dynamic_pull_txn_per_s as f64) as u64, 1); - if let Some(proof_rx) = self.handle_scheduled_pull(dynamic_pull_max_txn).await { + if let Some(fragment) = self.handle_scheduled_pull(dynamic_pull_max_txn).await { last_non_empty_pull = now; - proofs_in_progress.push(Box::pin(proof_rx)); - } - } - }), - Some(next_proof) = proofs_in_progress.next() => monitor!("batch_generator_handle_proof", { - match next_proof { - Ok(proof) => self.handle_completed_proof(proof).await, - Err(_) => { - debug!("QS: proof oneshot dropped"); + network_sender.broadcast_fragment(fragment).await; } } }), @@ -355,16 +305,29 @@ impl BatchGenerator { self.latest_logical_time = logical_time; // Cleans up all batches that expire in rounds <= logical_time.round(). This is // safe since clean request must occur only after execution result is certified. - for batch_id in self.batch_expirations.expire(logical_time.round()) { + for batch_id in self.batch_round_expirations.expire(logical_time.round()) { if self.batches_in_progress.remove(&batch_id).is_some() { debug!( - "QS: expired batch w. id {} from batches_in_progress, new size {}", + "QS: logical time based expiration batch w. id {} from batches_in_progress, new size {}", batch_id, self.batches_in_progress.len(), ); } } }, + BatchGeneratorCommand::ProofExpiration(batch_ids) => { + // Quorum store measurements + counters::TIMEOUT_BATCHES_COUNT.inc(); + + for batch_id in batch_ids { + debug!( + "QS: received timeout for proof of store, batch id = {}", + batch_id + ); + // Not able to gather the proof, allow transactions to be polled again. + self.batches_in_progress.remove(&batch_id); + } + } BatchGeneratorCommand::Shutdown(ack_tx) => { ack_tx .send(()) diff --git a/consensus/src/quorum_store/batch_store.rs b/consensus/src/quorum_store/batch_store.rs index d760cc1cd6bdb..c9e01de57b14a 100644 --- a/consensus/src/quorum_store/batch_store.rs +++ b/consensus/src/quorum_store/batch_store.rs @@ -11,7 +11,7 @@ use crate::{ use anyhow::bail; use aptos_consensus_types::{ common::Round, - proof_of_store::{LogicalTime, ProofOfStore, SignedDigest}, + proof_of_store::{BatchId, LogicalTime, ProofOfStore, SignedDigest}, }; use aptos_crypto::HashValue; use aptos_executor_types::Error; @@ -42,6 +42,7 @@ pub struct PersistRequest { impl PersistRequest { pub fn new( author: PeerId, + batch_id: BatchId, payload: Vec, digest_hash: HashValue, num_bytes: usize, @@ -49,7 +50,7 @@ impl PersistRequest { ) -> Self { Self { digest: digest_hash, - value: PersistedValue::new(Some(payload), expiration, author, num_bytes), + value: PersistedValue::new(Some(payload), expiration, author, batch_id, num_bytes), } } } @@ -367,24 +368,24 @@ impl BatchStore { let num_txns = persist_request.value.maybe_payload.as_ref().unwrap().len() as u64; let num_bytes = persist_request.value.num_bytes as u64; let batch_author = persist_request.value.author; + let batch_id = persist_request.value.batch_id; trace!("QS: sign digest {}", persist_request.digest); if needs_db { self.db .save_batch(persist_request.digest, persist_request.value) .expect("Could not write to DB"); } - Some( - SignedDigest::new( - batch_author, - self.epoch(), - persist_request.digest, - expiration, - num_txns, - num_bytes, - &self.validator_signer, - ) - .unwrap(), + SignedDigest::new( + batch_author, + batch_id, + self.epoch(), + persist_request.digest, + expiration, + num_txns, + num_bytes, + &self.validator_signer, ) + .ok() }, Err(e) => { @@ -432,30 +433,19 @@ impl BatchStore { self.last_certified_round.load(Ordering::Relaxed) } - fn get_batch_from_db(&self, digest: &HashValue) -> Result, Error> { + fn get_batch_from_db(&self, digest: &HashValue) -> Result { counters::GET_BATCH_FROM_DB_COUNT.inc(); match self.db.get_batch(digest) { - Ok(Some(persisted_value)) => { - let payload = persisted_value - .maybe_payload - .expect("Persisted value in QuorumStore DB must have payload"); - return Ok(payload); - }, - Ok(None) => { - unreachable!("Could not read persisted value (according to BatchReader) from DB") - }, - Err(_) => { - // TODO: handle error, e.g. from self or not, log, panic. + Ok(Some(persisted_value)) => Ok(persisted_value), + Ok(None) | Err(_) => { + error!("Could not get batch from db"); + Err(Error::CouldNotGetData) }, } - Err(Error::CouldNotGetData) } - pub fn get_batch_from_local( - &self, - digest: &HashValue, - ) -> Result, Error> { + pub fn get_batch_from_local(&self, digest: &HashValue) -> Result { if let Some(value) = self.db_cache.get(digest) { if payload_storage_mode(&value) == StorageMode::PersistedOnly { assert!( @@ -465,24 +455,38 @@ impl BatchStore { self.get_batch_from_db(digest) } else { // Available in memory. - Ok(value - .maybe_payload - .clone() - .expect("BatchReader payload and storage kind mismatch")) + Ok(value.clone()) } } else { Err(Error::CouldNotGetData) } } +} + +pub trait BatchReader: Send + Sync { + /// Check if the batch corresponding to the digest exists, return the batch author if true + fn exists(&self, digest: &HashValue) -> Option; + + fn get_batch( + &self, + proof: ProofOfStore, + ) -> oneshot::Receiver, Error>>; +} + +impl BatchReader for BatchStore { + fn exists(&self, digest: &HashValue) -> Option { + self.get_batch_from_local(digest).map(|v| v.author).ok() + } - pub fn get_batch( + fn get_batch( &self, proof: ProofOfStore, ) -> oneshot::Receiver, Error>> { let (tx, rx) = oneshot::channel(); if let Ok(value) = self.get_batch_from_local(proof.digest()) { - tx.send(Ok(value)).unwrap(); + tx.send(Ok(value.maybe_payload.expect("Must have payload"))) + .unwrap(); } else { // Quorum store metrics counters::MISSED_BATCHES_COUNT.inc(); diff --git a/consensus/src/quorum_store/network_listener.rs b/consensus/src/quorum_store/network_listener.rs index c277371c6b87e..09762006423dc 100644 --- a/consensus/src/quorum_store/network_listener.rs +++ b/consensus/src/quorum_store/network_listener.rs @@ -71,13 +71,12 @@ impl NetworkListener { idx ); self.remote_batch_coordinator_tx[idx] - .send(BatchCoordinatorCommand::RemoteFragment(fragment)) + .send(BatchCoordinatorCommand::AppendFragment(fragment)) .await .expect("Could not send remote fragment"); }, VerifiedEvent::ProofOfStoreMsg(proof) => { - counters::REMOTE_POS_COUNT.inc(); - let cmd = ProofManagerCommand::RemoteProof(*proof); + let cmd = ProofManagerCommand::ReceiveProof(*proof); self.proof_manager_tx .send(cmd) .await @@ -87,7 +86,7 @@ impl NetworkListener { unreachable!() }, }; - }) + }); } } } diff --git a/consensus/src/quorum_store/proof_coordinator.rs b/consensus/src/quorum_store/proof_coordinator.rs index e88f4d4927213..f99503d1f717e 100644 --- a/consensus/src/quorum_store/proof_coordinator.rs +++ b/consensus/src/quorum_store/proof_coordinator.rs @@ -3,9 +3,9 @@ use crate::{ monitor, + network::QuorumStoreSender, quorum_store::{ - batch_generator::ProofError, counters, proof_manager::ProofManagerCommand, types::BatchId, - utils::DigestTimeouts, + batch_generator::BatchGeneratorCommand, batch_store::BatchReader, counters, utils::Timeouts, }, }; use aptos_consensus_types::proof_of_store::{ @@ -16,42 +16,32 @@ use aptos_logger::prelude::*; use aptos_types::{ aggregate_signature::PartialSignatures, validator_verifier::ValidatorVerifier, PeerId, }; -use futures::channel::oneshot; use std::{ - collections::{BTreeMap, HashMap}, + collections::{hash_map::Entry, BTreeMap, HashMap}, + sync::Arc, time::Duration, }; use tokio::{ - sync::{ - mpsc::{Receiver, Sender}, - oneshot as TokioOneshot, - }, + sync::{mpsc::Receiver, oneshot as TokioOneshot}, time, }; #[derive(Debug)] pub(crate) enum ProofCoordinatorCommand { - InitProof(SignedDigestInfo, BatchId, ProofReturnChannel), AppendSignature(SignedDigest), Shutdown(TokioOneshot::Sender<()>), } -pub(crate) type ProofReturnChannel = oneshot::Sender>; - struct IncrementalProofState { info: SignedDigestInfo, aggregated_signature: BTreeMap, - batch_id: BatchId, - ret_tx: ProofReturnChannel, } impl IncrementalProofState { - fn new(info: SignedDigestInfo, batch_id: BatchId, ret_tx: ProofReturnChannel) -> Self { + fn new(info: SignedDigestInfo) -> Self { Self { info, aggregated_signature: BTreeMap::new(), - batch_id, - ret_tx, } } @@ -72,34 +62,20 @@ impl IncrementalProofState { Ok(()) } - fn ready(&self, validator_verifier: &ValidatorVerifier, my_peer_id: PeerId) -> bool { - self.aggregated_signature.contains_key(&my_peer_id) - && validator_verifier - .check_voting_power(self.aggregated_signature.keys()) - .is_ok() + fn ready(&self, validator_verifier: &ValidatorVerifier) -> bool { + validator_verifier + .check_voting_power(self.aggregated_signature.keys()) + .is_ok() } - fn take( - self, - validator_verifier: &ValidatorVerifier, - ) -> (ProofOfStore, BatchId, ProofReturnChannel) { + fn take(self, validator_verifier: &ValidatorVerifier) -> ProofOfStore { let proof = match validator_verifier .aggregate_signatures(&PartialSignatures::new(self.aggregated_signature)) { Ok(sig) => ProofOfStore::new(self.info, sig), Err(e) => unreachable!("Cannot aggregate signatures on digest err = {:?}", e), }; - (proof, self.batch_id, self.ret_tx) - } - - fn send_timeout(self) { - if self - .ret_tx - .send(Err(ProofError::Timeout(self.batch_id))) - .is_err() - { - debug!("Failed to send timeout for batch {}", self.batch_id); - } + proof } } @@ -109,30 +85,53 @@ pub(crate) struct ProofCoordinator { digest_to_proof: HashMap, digest_to_time: HashMap, // to record the batch creation time - timeouts: DigestTimeouts, + timeouts: Timeouts, + batch_reader: Arc, + batch_generator_cmd_tx: tokio::sync::mpsc::Sender, } //PoQS builder object - gather signed digest to form PoQS impl ProofCoordinator { - pub fn new(proof_timeout_ms: usize, peer_id: PeerId) -> Self { + pub fn new( + proof_timeout_ms: usize, + peer_id: PeerId, + batch_reader: Arc, + batch_generator_cmd_tx: tokio::sync::mpsc::Sender, + ) -> Self { Self { peer_id, proof_timeout_ms, digest_to_proof: HashMap::new(), digest_to_time: HashMap::new(), - timeouts: DigestTimeouts::new(), + timeouts: Timeouts::new(), + batch_reader, + batch_generator_cmd_tx, } } - fn init_proof(&mut self, info: SignedDigestInfo, batch_id: BatchId, tx: ProofReturnChannel) { - self.timeouts.add_digest(info.digest, self.proof_timeout_ms); + fn init_proof(&mut self, signed_digest: &SignedDigest) -> Result<(), SignedDigestError> { + // Check if the signed digest corresponding to our batch + if signed_digest.info().batch_author != self.peer_id { + return Err(SignedDigestError::WrongAuthor); + } + let batch_author = self + .batch_reader + .exists(&signed_digest.digest()) + .ok_or(SignedDigestError::WrongAuthor)?; + if batch_author != signed_digest.info().batch_author { + return Err(SignedDigestError::WrongAuthor); + } + + self.timeouts + .add(signed_digest.info().clone(), self.proof_timeout_ms); self.digest_to_proof.insert( - info.digest, - IncrementalProofState::new(info.clone(), batch_id, tx), + signed_digest.digest(), + IncrementalProofState::new(signed_digest.info().clone()), ); self.digest_to_time - .entry(info.digest) + .entry(signed_digest.digest()) .or_insert(chrono::Utc::now().naive_utc().timestamp_micros() as u64); + Ok(()) } fn add_signature( @@ -141,58 +140,53 @@ impl ProofCoordinator { validator_verifier: &ValidatorVerifier, ) -> Result, SignedDigestError> { if !self.digest_to_proof.contains_key(&signed_digest.digest()) { - return Err(SignedDigestError::WrongInfo); + self.init_proof(&signed_digest)?; } - let mut ret = Ok(()); - let mut proof_changed_to_completed = false; let digest = signed_digest.digest(); - let my_id = self.peer_id; - self.digest_to_proof - .entry(signed_digest.digest()) - .and_modify(|state| { - ret = state.add_signature(signed_digest); - if ret.is_ok() { - proof_changed_to_completed = state.ready(validator_verifier, my_id); - } - }); - if proof_changed_to_completed { - let (proof, batch_id, tx) = self - .digest_to_proof - .remove(&digest) - .unwrap() - .take(validator_verifier); - - // quorum store measurements - let duration = chrono::Utc::now().naive_utc().timestamp_micros() as u64 - - self - .digest_to_time - .get(&digest) - .expect("Batch created without recording the time!"); - counters::BATCH_TO_POS_DURATION.observe_duration(Duration::from_micros(duration)); - - // TODO: just send back an ack - if tx.send(Ok((proof.clone(), batch_id))).is_err() { - debug!("Failed to send back completion for batch {}", batch_id); - } - Ok(Some(proof)) - } else { - Ok(None) + match self.digest_to_proof.entry(signed_digest.digest()) { + Entry::Occupied(mut entry) => { + entry.get_mut().add_signature(signed_digest)?; + if entry.get_mut().ready(validator_verifier) { + let (_, state) = entry.remove_entry(); + let proof = state.take(validator_verifier); + // quorum store measurements + let duration = chrono::Utc::now().naive_utc().timestamp_micros() as u64 + - self + .digest_to_time + .remove(&digest) + .expect("Batch created without recording the time!"); + counters::BATCH_TO_POS_DURATION + .observe_duration(Duration::from_micros(duration)); + return Ok(Some(proof)); + } + }, + Entry::Vacant(_) => (), } + Ok(None) } - fn expire(&mut self) { - for digest in self.timeouts.expire() { - if let Some(state) = self.digest_to_proof.remove(&digest) { - state.send_timeout(); - } + async fn expire(&mut self) { + let mut batch_ids = vec![]; + for signed_digest_info in self.timeouts.expire() { + counters::TIMEOUT_BATCHES_COUNT.inc(); + self.digest_to_proof.remove(&signed_digest_info.digest); + batch_ids.push(signed_digest_info.batch_id); + } + if self + .batch_generator_cmd_tx + .send(BatchGeneratorCommand::ProofExpiration(batch_ids)) + .await + .is_err() + { + warn!("Failed to send proof expiration to batch generator"); } } pub async fn start( mut self, mut rx: Receiver, - tx: Sender, + mut network_sender: impl QuorumStoreSender, validator_verifier: ValidatorVerifier, ) { let mut interval = time::interval(Duration::from_millis(100)); @@ -206,10 +200,6 @@ impl ProofCoordinator { .expect("Failed to send shutdown ack to QuorumStore"); break; }, - ProofCoordinatorCommand::InitProof(info, batch_id, tx) => { - debug!("QS: init proof, batch_id {}, digest {}", batch_id, info.digest); - self.init_proof(info, batch_id, tx); - }, ProofCoordinatorCommand::AppendSignature(signed_digest) => { let peer_id = signed_digest.signer(); let digest = signed_digest.digest(); @@ -217,7 +207,7 @@ impl ProofCoordinator { Ok(result) => { if let Some(proof) = result { debug!("QS: received quorum of signatures, digest {}", digest); - tx.send(ProofManagerCommand::LocalProof(proof)).await.unwrap(); + network_sender.broadcast_proof_of_store(proof).await; } }, Err(e) => { @@ -232,7 +222,7 @@ impl ProofCoordinator { } }), _ = interval.tick() => { - monitor!("proof_coordinator_handle_tick", self.expire()); + monitor!("proof_coordinator_handle_tick", self.expire().await); } } } diff --git a/consensus/src/quorum_store/proof_manager.rs b/consensus/src/quorum_store/proof_manager.rs index 49f0bac6dac47..bdbae665cb5fa 100644 --- a/consensus/src/quorum_store/proof_manager.rs +++ b/consensus/src/quorum_store/proof_manager.rs @@ -3,7 +3,6 @@ use crate::{ monitor, - network::{NetworkSender, QuorumStoreSender}, quorum_store::{batch_generator::BackPressure, counters, utils::ProofQueue}, }; use aptos_consensus_types::{ @@ -13,19 +12,20 @@ use aptos_consensus_types::{ }; use aptos_crypto::HashValue; use aptos_logger::prelude::*; +use aptos_types::PeerId; use futures::StreamExt; use futures_channel::mpsc::Receiver; use std::collections::HashSet; #[derive(Debug)] pub enum ProofManagerCommand { - LocalProof(ProofOfStore), - RemoteProof(ProofOfStore), + ReceiveProof(ProofOfStore), CommitNotification(LogicalTime, Vec), Shutdown(tokio::sync::oneshot::Sender<()>), } pub struct ProofManager { + my_peer_id: PeerId, proofs_for_consensus: ProofQueue, latest_logical_time: LogicalTime, back_pressure_total_txn_limit: u64, @@ -37,10 +37,12 @@ pub struct ProofManager { impl ProofManager { pub fn new( epoch: u64, + my_peer_id: PeerId, back_pressure_total_txn_limit: u64, back_pressure_total_proof_limit: u64, ) -> Self { Self { + my_peer_id, proofs_for_consensus: ProofQueue::new(), latest_logical_time: LogicalTime::new(epoch, 0), back_pressure_total_txn_limit, @@ -56,21 +58,11 @@ impl ProofManager { self.remaining_total_proof_num += 1; } - pub(crate) async fn handle_local_proof( - &mut self, - proof: ProofOfStore, - network_sender: &mut NetworkSender, - ) { + pub(crate) fn receive_proof(&mut self, proof: ProofOfStore) { + let is_local = proof.info().batch_author == self.my_peer_id; let num_txns = proof.info().num_txns; - self.proofs_for_consensus.push(proof.clone(), true); - self.increment_remaining_txns(num_txns); - network_sender.broadcast_proof_of_store(proof).await; - } - - pub(crate) fn handle_remote_proof(&mut self, proof: ProofOfStore) { - let num_txns = proof.info().num_txns; - self.proofs_for_consensus.push(proof, false); self.increment_remaining_txns(num_txns); + self.proofs_for_consensus.push(proof, is_local); } pub(crate) fn handle_commit_notification( @@ -160,7 +152,6 @@ impl ProofManager { pub async fn start( mut self, - mut network_sender: NetworkSender, back_pressure_tx: tokio::sync::mpsc::Sender, mut proposal_rx: Receiver, mut proof_rx: tokio::sync::mpsc::Receiver, @@ -195,11 +186,8 @@ impl ProofManager { .expect("Failed to send shutdown ack to QuorumStore"); break; }, - ProofManagerCommand::LocalProof(proof) => { - self.handle_local_proof(proof, &mut network_sender).await; - }, - ProofManagerCommand::RemoteProof(proof) => { - self.handle_remote_proof(proof); + ProofManagerCommand::ReceiveProof(proof) => { + self.receive_proof(proof); }, ProofManagerCommand::CommitNotification(logical_time, digests) => { self.handle_commit_notification(logical_time, digests); diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index ae0229b6c7b4a..bc7e63d1d3932 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -26,7 +26,6 @@ use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::config::{QuorumStoreConfig, SecureBackend}; use aptos_consensus_types::{common::Author, request_response::GetPayloadCommand}; use aptos_global_constants::CONSENSUS_KEY; -use aptos_infallible::Mutex; use aptos_logger::prelude::*; use aptos_mempool::QuorumStoreRequest; use aptos_secure_storage::{KVStorage, Storage}; @@ -128,8 +127,6 @@ pub struct InnerBuilder { coordinator_rx: Option>, batch_generator_cmd_tx: tokio::sync::mpsc::Sender, batch_generator_cmd_rx: Option>, - batch_coordinator_cmd_tx: tokio::sync::mpsc::Sender, - batch_coordinator_cmd_rx: Option>, proof_coordinator_cmd_tx: tokio::sync::mpsc::Sender, proof_coordinator_cmd_rx: Option>, proof_manager_cmd_tx: tokio::sync::mpsc::Sender, @@ -162,8 +159,6 @@ impl InnerBuilder { let (coordinator_tx, coordinator_rx) = futures_channel::mpsc::channel(config.channel_size); let (batch_generator_cmd_tx, batch_generator_cmd_rx) = tokio::sync::mpsc::channel(config.channel_size); - let (batch_coordinator_cmd_tx, batch_coordinator_cmd_rx) = - tokio::sync::mpsc::channel(config.channel_size); let (proof_coordinator_cmd_tx, proof_coordinator_cmd_rx) = tokio::sync::mpsc::channel(config.channel_size); let (proof_manager_cmd_tx, proof_manager_cmd_rx) = @@ -200,8 +195,6 @@ impl InnerBuilder { coordinator_rx: Some(coordinator_rx), batch_generator_cmd_tx, batch_generator_cmd_rx: Some(batch_generator_cmd_rx), - batch_coordinator_cmd_tx, - batch_coordinator_cmd_rx: Some(batch_coordinator_cmd_rx), proof_coordinator_cmd_tx, proof_coordinator_cmd_rx: Some(proof_coordinator_cmd_rx), proof_manager_cmd_tx, @@ -288,7 +281,6 @@ impl InnerBuilder { let quorum_store_coordinator = QuorumStoreCoordinator::new( self.author, self.batch_generator_cmd_tx.clone(), - self.batch_coordinator_cmd_tx.clone(), self.remote_batch_coordinator_cmd_tx.clone(), self.proof_coordinator_cmd_tx.clone(), self.proof_manager_cmd_tx.clone(), @@ -303,28 +295,21 @@ impl InnerBuilder { let back_pressure_rx = self.back_pressure_rx.take().unwrap(); let batch_generator = BatchGenerator::new( self.epoch, + self.author, self.config.clone(), self.quorum_store_storage.clone(), self.quorum_store_to_mempool_sender, - self.batch_coordinator_cmd_tx.clone(), self.mempool_txn_pull_timeout_ms, ); spawn_named!( "batch_generator", - batch_generator.start(batch_generator_cmd_rx, back_pressure_rx, interval) - ); - - let batch_coordinator_cmd_rx = self.batch_coordinator_cmd_rx.take().unwrap(); - let batch_coordinator = BatchCoordinator::new( - self.epoch, - self.author, - self.network_sender.clone(), - batch_coordinator_cmd_rx, - self.batch_store.clone().unwrap(), - self.proof_coordinator_cmd_tx.clone(), - self.config.max_batch_bytes, + batch_generator.start( + self.network_sender.clone(), + batch_generator_cmd_rx, + back_pressure_rx, + interval + ) ); - spawn_named!("batch_coordinator", batch_coordinator.start()); for (i, remote_batch_coordinator_cmd_rx) in self.remote_batch_coordinator_cmd_rx.into_iter().enumerate() @@ -333,23 +318,29 @@ impl InnerBuilder { self.epoch, self.author, self.network_sender.clone(), - remote_batch_coordinator_cmd_rx, self.batch_store.clone().unwrap(), - self.proof_coordinator_cmd_tx.clone(), self.config.max_batch_bytes, ); #[allow(unused_variables)] let name = format!("batch_coordinator-{}", i).as_str(); - spawn_named!(name, batch_coordinator.start()); + spawn_named!( + name, + batch_coordinator.start(remote_batch_coordinator_cmd_rx) + ); } let proof_coordinator_cmd_rx = self.proof_coordinator_cmd_rx.take().unwrap(); - let proof_coordinator = ProofCoordinator::new(self.config.proof_timeout_ms, self.author); + let proof_coordinator = ProofCoordinator::new( + self.config.proof_timeout_ms, + self.author, + self.batch_store.clone().unwrap(), + self.batch_generator_cmd_tx.clone(), + ); spawn_named!( "proof_coordinator", proof_coordinator.start( proof_coordinator_cmd_rx, - self.proof_manager_cmd_tx.clone(), + self.network_sender.clone(), self.verifier.clone(), ) ); @@ -357,6 +348,7 @@ impl InnerBuilder { let proof_manager_cmd_rx = self.proof_manager_cmd_rx.take().unwrap(); let proof_manager = ProofManager::new( self.epoch, + self.author, self.config.back_pressure.backlog_txn_limit_count, self.config .back_pressure @@ -366,7 +358,6 @@ impl InnerBuilder { spawn_named!( "proof_manager", proof_manager.start( - self.network_sender.clone(), self.back_pressure_tx.clone(), self.consensus_to_quorum_store_receiver, proof_manager_cmd_rx, @@ -396,7 +387,12 @@ impl InnerBuilder { while let Some(rpc_request) = batch_retrieval_rx.next().await { counters::RECEIVED_BATCH_REQUEST_COUNT.inc(); if let Ok(value) = batch_store.get_batch_from_local(&rpc_request.req.digest()) { - let batch = Batch::new(author, epoch, rpc_request.req.digest(), value); + let batch = Batch::new( + author, + epoch, + rpc_request.req.digest(), + value.maybe_payload.expect("Must have payload"), + ); let msg = ConsensusMsg::BatchMsg(Box::new(batch)); let bytes = rpc_request.protocol.to_bytes(&msg).unwrap(); if let Err(e) = rpc_request @@ -426,7 +422,7 @@ impl InnerBuilder { Arc::from(PayloadManager::InQuorumStore( batch_store, // TODO: remove after splitting out clean requests - Mutex::new(self.coordinator_tx.clone()), + self.coordinator_tx.clone(), )), Some(self.quorum_store_msg_tx.clone()), ) diff --git a/consensus/src/quorum_store/quorum_store_coordinator.rs b/consensus/src/quorum_store/quorum_store_coordinator.rs index 6639c3d52179e..09571409d16c5 100644 --- a/consensus/src/quorum_store/quorum_store_coordinator.rs +++ b/consensus/src/quorum_store/quorum_store_coordinator.rs @@ -25,7 +25,6 @@ pub enum CoordinatorCommand { pub struct QuorumStoreCoordinator { my_peer_id: PeerId, batch_generator_cmd_tx: mpsc::Sender, - batch_coordinator_cmd_tx: mpsc::Sender, remote_batch_coordinator_cmd_tx: Vec>, proof_coordinator_cmd_tx: mpsc::Sender, proof_manager_cmd_tx: mpsc::Sender, @@ -36,7 +35,6 @@ impl QuorumStoreCoordinator { pub(crate) fn new( my_peer_id: PeerId, batch_generator_cmd_tx: mpsc::Sender, - batch_coordinator_cmd_tx: mpsc::Sender, remote_batch_coordinator_cmd_tx: Vec>, proof_coordinator_cmd_tx: mpsc::Sender, proof_manager_cmd_tx: mpsc::Sender, @@ -45,7 +43,6 @@ impl QuorumStoreCoordinator { Self { my_peer_id, batch_generator_cmd_tx, - batch_coordinator_cmd_tx, remote_batch_coordinator_cmd_tx, proof_coordinator_cmd_tx, proof_manager_cmd_tx, @@ -103,18 +100,6 @@ impl QuorumStoreCoordinator { .await .expect("Failed to stop BatchGenerator"); - let (batch_coordinator_shutdown_tx, batch_coordinator_shutdown_rx) = - oneshot::channel(); - self.batch_coordinator_cmd_tx - .send(BatchCoordinatorCommand::Shutdown( - batch_coordinator_shutdown_tx, - )) - .await - .expect("Failed to send to BatchCoordinator"); - batch_coordinator_shutdown_rx - .await - .expect("Failed to stop BatchCoordinator"); - for remote_batch_coordinator_cmd_tx in self.remote_batch_coordinator_cmd_tx { let ( diff --git a/consensus/src/quorum_store/quorum_store_db.rs b/consensus/src/quorum_store/quorum_store_db.rs index 8606e87e55380..1cdc5898eca9e 100644 --- a/consensus/src/quorum_store/quorum_store_db.rs +++ b/consensus/src/quorum_store/quorum_store_db.rs @@ -5,10 +5,11 @@ use crate::{ error::DbError, quorum_store::{ schema::{BatchIdSchema, BatchSchema, BATCH_CF_NAME, BATCH_ID_CF_NAME}, - types::{BatchId, PersistedValue}, + types::PersistedValue, }, }; use anyhow::Result; +use aptos_consensus_types::proof_of_store::BatchId; use aptos_crypto::HashValue; use aptos_logger::prelude::*; use aptos_schemadb::{Options, ReadOptions, SchemaBatch, DB}; @@ -26,6 +27,7 @@ pub(crate) trait QuorumStoreStorage: Sync + Send { fn delete_batch_id(&self, epoch: u64) -> Result<(), DbError>; fn clean_and_get_batch_id(&self, current_epoch: u64) -> Result, DbError>; + fn save_batch_id(&self, epoch: u64, batch_id: BatchId) -> Result<(), DbError>; } diff --git a/consensus/src/quorum_store/schema.rs b/consensus/src/quorum_store/schema.rs index 3c0382ec1dee2..f6213c463c9c0 100644 --- a/consensus/src/quorum_store/schema.rs +++ b/consensus/src/quorum_store/schema.rs @@ -1,8 +1,9 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::quorum_store::types::{BatchId, PersistedValue}; +use crate::quorum_store::types::PersistedValue; use anyhow::Result; +use aptos_consensus_types::proof_of_store::BatchId; use aptos_crypto::HashValue; use aptos_schemadb::{ schema::{KeyCodec, Schema, ValueCodec}, diff --git a/consensus/src/quorum_store/tests/batch_aggregator_test.rs b/consensus/src/quorum_store/tests/batch_aggregator_test.rs index 4fc65fc32f99e..2806a4bbff0ea 100644 --- a/consensus/src/quorum_store/tests/batch_aggregator_test.rs +++ b/consensus/src/quorum_store/tests/batch_aggregator_test.rs @@ -4,8 +4,9 @@ use crate::quorum_store::{ batch_aggregator::{BatchAggregationError, BatchAggregator, IncrementalBatchState}, tests::utils::create_vec_serialized_transactions, - types::{BatchId, SerializedTransaction}, + types::SerializedTransaction, }; +use aptos_consensus_types::proof_of_store::BatchId; use aptos_types::transaction::SignedTransaction; use claims::{assert_ge, assert_matches, assert_ok, assert_ok_eq}; diff --git a/consensus/src/quorum_store/tests/batch_generator_test.rs b/consensus/src/quorum_store/tests/batch_generator_test.rs index 53fd3ecb0a707..66fa17c44d7e2 100644 --- a/consensus/src/quorum_store/tests/batch_generator_test.rs +++ b/consensus/src/quorum_store/tests/batch_generator_test.rs @@ -6,16 +6,17 @@ use crate::quorum_store::{ batch_generator::BatchGenerator, quorum_store_db::MockQuorumStoreDB, tests::utils::{create_vec_serialized_transactions, create_vec_signed_transactions}, - types::{BatchId, SerializedTransaction}, + types::SerializedTransaction, }; use aptos_config::config::QuorumStoreConfig; -use aptos_consensus_types::common::TransactionSummary; +use aptos_consensus_types::{common::TransactionSummary, proof_of_store::BatchId}; use aptos_mempool::{QuorumStoreRequest, QuorumStoreResponse}; use aptos_types::transaction::SignedTransaction; use futures::{ channel::mpsc::{channel, Receiver}, StreamExt, }; +use move_core_types::account_address::AccountAddress; use std::{sync::Arc, time::Duration}; use tokio::{sync::mpsc::channel as TokioChannel, time::timeout}; @@ -62,10 +63,10 @@ async fn test_batch_creation() { let mut batch_generator = BatchGenerator::new( 0, + AccountAddress::random(), config, Arc::new(MockQuorumStoreDB::new()), quorum_store_to_mempool_tx, - batch_coordinator_cmd_tx, 1000, ); @@ -87,10 +88,11 @@ async fn test_batch_creation() { .await; // Expect AppendToBatch for 1 txn let quorum_store_command = batch_coordinator_cmd_rx.recv().await.unwrap(); - if let BatchCoordinatorCommand::AppendToBatch(data, batch_id) = quorum_store_command { - assert_eq!(batch_id, BatchId::new_for_test(1)); - assert_eq!(data.len(), signed_txns.len()); - assert_eq!(data, serialize(&signed_txns)); + if let BatchCoordinatorCommand::AppendFragment(data) = quorum_store_command { + assert_eq!(data.batch_id(), BatchId::new_for_test(1)); + let txns = data.into_transactions(); + assert_eq!(txns.len(), signed_txns.len()); + assert_eq!(txns, serialize(&signed_txns)); } else { panic!("Unexpected variant") } @@ -104,9 +106,10 @@ async fn test_batch_creation() { assert_eq!(exclude_txns.len(), num_txns); // Expect EndBatch for 1 + 9 = 10 txns. The last txn pulled is not included in the batch. let quorum_store_command = batch_coordinator_cmd_rx.recv().await.unwrap(); - if let BatchCoordinatorCommand::EndBatch(data, _, _, _) = quorum_store_command { - assert_eq!(data.len(), signed_txns.len() - 1); - assert_eq!(data, serialize(&signed_txns[0..8].to_vec())); + if let BatchCoordinatorCommand::AppendFragment(data) = quorum_store_command { + let txns = data.into_transactions(); + assert_eq!(txns.len(), signed_txns.len() - 1); + assert_eq!(txns, serialize(&signed_txns[0..8].to_vec())); } else { panic!("Unexpected variant") } @@ -120,22 +123,23 @@ async fn test_batch_creation() { assert_eq!(exclude_txns.len(), num_txns); // Expect AppendBatch for 9 txns let quorum_store_command = batch_coordinator_cmd_rx.recv().await.unwrap(); - if let BatchCoordinatorCommand::AppendToBatch(data, batch_id) = quorum_store_command { - assert_eq!(batch_id, BatchId::new_for_test(2)); - assert_eq!(data.len(), signed_txns.len()); - assert_eq!(data, serialize(&signed_txns)); + if let BatchCoordinatorCommand::AppendFragment(data) = quorum_store_command { + assert_eq!(data.batch_id(), BatchId::new_for_test(2)); + let txns = data.into_transactions(); + assert_eq!(txns.len(), signed_txns.len()); + assert_eq!(txns, serialize(&signed_txns)); } else { panic!("Unexpected variant") } }); - let result = batch_generator.handle_scheduled_pull(300).await; - assert!(result.is_none()); - let result = batch_generator.handle_scheduled_pull(300).await; - assert!(result.is_some()); - let result = batch_generator.handle_scheduled_pull(300).await; - assert!(result.is_none()); - + for _ in 0..3 { + let result = batch_generator.handle_scheduled_pull(300).await.unwrap(); + batch_coordinator_cmd_tx + .send(BatchCoordinatorCommand::AppendFragment(Box::new(result))) + .await + .unwrap(); + } timeout(Duration::from_millis(10_000), join_handle) .await .unwrap() diff --git a/consensus/src/quorum_store/tests/batch_store_test.rs b/consensus/src/quorum_store/tests/batch_store_test.rs index 432ef54ae60a4..f9986f9aaff94 100644 --- a/consensus/src/quorum_store/tests/batch_store_test.rs +++ b/consensus/src/quorum_store/tests/batch_store_test.rs @@ -8,7 +8,7 @@ use crate::{ }, test_utils::mock_quorum_store_sender::MockQuorumStoreSender, }; -use aptos_consensus_types::proof_of_store::LogicalTime; +use aptos_consensus_types::proof_of_store::{BatchId, LogicalTime}; use aptos_crypto::HashValue; use aptos_temppath::TempPath; use aptos_types::{account_address::AccountAddress, validator_verifier::random_validator_verifier}; @@ -61,6 +61,7 @@ fn test_insert_expire() { Some(Vec::new()), LogicalTime::new(10, 15), // Expiration AccountAddress::random(), + BatchId::new_for_test(1), 10, ), ), @@ -74,6 +75,7 @@ fn test_insert_expire() { Some(Vec::new()), LogicalTime::new(10, 30), // Expiration AccountAddress::random(), + BatchId::new_for_test(1), 10, ), ), @@ -86,6 +88,7 @@ fn test_insert_expire() { Some(Vec::new()), LogicalTime::new(10, 25), // Expiration AccountAddress::random(), + BatchId::new_for_test(1), 10, ), ), @@ -121,6 +124,7 @@ async fn test_extend_expiration_vs_save() { Some(Vec::new()), LogicalTime::new(10, i as u64 + 30), AccountAddress::random(), + BatchId::new_for_test(1), 10, ), ) @@ -133,6 +137,7 @@ async fn test_extend_expiration_vs_save() { Some(Vec::new()), LogicalTime::new(10, i as u64 + 40), AccountAddress::random(), + BatchId::new_for_test(1), 10, ), ) @@ -190,6 +195,7 @@ async fn test_extend_expiration_vs_save() { Some(Vec::new()), LogicalTime::new(10, i as u64 + 30), AccountAddress::random(), + BatchId::new_for_test(1), 10, ), ) diff --git a/consensus/src/quorum_store/tests/proof_coordinator_test.rs b/consensus/src/quorum_store/tests/proof_coordinator_test.rs index 4c7e4357c4dd7..5bec293983c6b 100644 --- a/consensus/src/quorum_store/tests/proof_coordinator_test.rs +++ b/consensus/src/quorum_store/tests/proof_coordinator_test.rs @@ -1,43 +1,64 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::quorum_store::{ - batch_generator::ProofError, - proof_coordinator::{ProofCoordinator, ProofCoordinatorCommand}, - proof_manager::ProofManagerCommand, - tests::utils::{compute_digest_from_signed_transaction, create_vec_signed_transactions}, - types::BatchId, +use crate::{ + network_interface::ConsensusMsg, + quorum_store::{ + batch_store::BatchReader, + proof_coordinator::{ProofCoordinator, ProofCoordinatorCommand}, + tests::utils::{compute_digest_from_signed_transaction, create_vec_signed_transactions}, + }, + test_utils::mock_quorum_store_sender::MockQuorumStoreSender, }; -use aptos_consensus_types::proof_of_store::{LogicalTime, SignedDigest, SignedDigestInfo}; -use aptos_types::validator_verifier::random_validator_verifier; -use futures::channel::oneshot; -use tokio::sync::mpsc::{channel, error::TryRecvError}; +use aptos_consensus_types::proof_of_store::{BatchId, LogicalTime, ProofOfStore, SignedDigest}; +use aptos_crypto::HashValue; +use aptos_executor_types::Error; +use aptos_types::{ + transaction::SignedTransaction, validator_verifier::random_validator_verifier, PeerId, +}; +use std::sync::Arc; +use tokio::sync::{mpsc::channel, oneshot::Receiver}; + +pub struct MockBatchReader { + peer: PeerId, +} + +impl BatchReader for MockBatchReader { + fn exists(&self, _digest: &HashValue) -> Option { + Some(self.peer) + } + + fn get_batch(&self, _proof: ProofOfStore) -> Receiver, Error>> { + unimplemented!(); + } +} #[tokio::test(flavor = "multi_thread")] async fn test_proof_coordinator_basic() { + aptos_logger::Logger::init_for_testing(); let (signers, verifier) = random_validator_verifier(4, None, true); - let proof_coordinator = ProofCoordinator::new(100, signers[0].author()); + let (tx, _rx) = channel(100); + let proof_coordinator = ProofCoordinator::new( + 100, + signers[0].author(), + Arc::new(MockBatchReader { + peer: signers[0].author(), + }), + tx, + ); let (proof_coordinator_tx, proof_coordinator_rx) = channel(100); - let (proof_manager_tx, mut proof_manager_rx) = channel(100); - tokio::spawn(proof_coordinator.start(proof_coordinator_rx, proof_manager_tx, verifier.clone())); + let (tx, mut rx) = channel(100); + let network_sender = MockQuorumStoreSender::new(tx); + tokio::spawn(proof_coordinator.start(proof_coordinator_rx, network_sender, verifier.clone())); let batch_author = signers[0].author(); + let batch_id = BatchId::new_for_test(1); let digest = compute_digest_from_signed_transaction(create_vec_signed_transactions(100)); - let signed_digest_info = - SignedDigestInfo::new(batch_author, digest, LogicalTime::new(1, 20), 1, 1); - let (proof_tx, proof_rx) = oneshot::channel(); - assert!(proof_coordinator_tx - .send(ProofCoordinatorCommand::InitProof( - signed_digest_info.clone(), - BatchId::new_for_test(0), - proof_tx - )) - .await - .is_ok()); for signer in &signers { let signed_digest = SignedDigest::new( batch_author, + batch_id, 1, digest, LogicalTime::new(1, 20), @@ -52,102 +73,11 @@ async fn test_proof_coordinator_basic() { .is_ok()); } - // check normal path - let (proof, batch_id) = proof_rx.await.expect("channel dropped").unwrap(); - assert_eq!(batch_id, BatchId::new_for_test(0)); - assert_eq!(proof.digest().clone(), digest); - assert!(proof.verify(&verifier).is_ok()); - match proof_manager_rx.recv().await.expect("channel dropped") { - ProofManagerCommand::LocalProof(cmd_proof) => assert_eq!(proof, cmd_proof), + let proof = match rx.recv().await.expect("channel dropped") { + (ConsensusMsg::ProofOfStoreMsg(proof), _) => *proof, msg => panic!("Expected LocalProof but received: {:?}", msg), - } - - // check that error path - let (proof_tx, proof_rx) = oneshot::channel(); - assert!(proof_coordinator_tx - .send(ProofCoordinatorCommand::InitProof( - signed_digest_info.clone(), - BatchId::new_for_test(4), - proof_tx - )) - .await - .is_ok()); - assert_eq!( - proof_rx.await.expect("channel dropped"), - Err(ProofError::Timeout(BatchId::new_for_test(4))) - ); - match proof_manager_rx.try_recv() { - Err(TryRecvError::Empty) => {}, - result => panic!("Expected Empty but instead: {:?}", result), - } - - // check same digest after expiration - let (proof_tx, proof_rx) = oneshot::channel(); - assert!(proof_coordinator_tx - .send(ProofCoordinatorCommand::InitProof( - signed_digest_info.clone(), - BatchId::new_for_test(4), - proof_tx - )) - .await - .is_ok()); - for signer in &signers { - let signed_digest = SignedDigest::new( - batch_author, - 1, - digest, - LogicalTime::new(1, 20), - 1, - 1, - signer, - ) - .unwrap(); - assert!(proof_coordinator_tx - .send(ProofCoordinatorCommand::AppendSignature(signed_digest)) - .await - .is_ok()); - } - let (proof, batch_id) = proof_rx.await.expect("channel dropped").unwrap(); - assert_eq!(batch_id, BatchId::new_for_test(4)); + }; + // check normal path assert_eq!(proof.digest().clone(), digest); assert!(proof.verify(&verifier).is_ok()); - match proof_manager_rx.recv().await.expect("channel dropped") { - ProofManagerCommand::LocalProof(cmd_proof) => assert_eq!(proof, cmd_proof), - msg => panic!("Expected LocalProof but received: {:?}", msg), - } - - // check wrong signatures - let (proof_tx, proof_rx) = oneshot::channel(); - assert!(proof_coordinator_tx - .send(ProofCoordinatorCommand::InitProof( - signed_digest_info, - BatchId::new_for_test(10), - proof_tx - )) - .await - .is_ok()); - for _ in 0..signers.len() { - let signed_digest = SignedDigest::new( - batch_author, - 1, - digest, - LogicalTime::new(1, 20), - 1, - 1, - &signers[1], - ) - .unwrap(); - assert!(proof_coordinator_tx - .send(ProofCoordinatorCommand::AppendSignature(signed_digest)) - .await - .is_ok()); - } - assert_eq!( - proof_rx.await.expect("channel dropped"), - Err(ProofError::Timeout(BatchId::new_for_test(10))) - ); - match proof_manager_rx.try_recv() { - Err(TryRecvError::Empty) => {}, - result => panic!("Expected Empty but instead: {:?}", result), - } } diff --git a/consensus/src/quorum_store/tests/proof_manager_test.rs b/consensus/src/quorum_store/tests/proof_manager_test.rs index 5a21b19c5ff2a..9ce37108ffe6f 100644 --- a/consensus/src/quorum_store/tests/proof_manager_test.rs +++ b/consensus/src/quorum_store/tests/proof_manager_test.rs @@ -4,24 +4,33 @@ use crate::quorum_store::proof_manager::ProofManager; use aptos_consensus_types::{ common::{Payload, PayloadFilter}, - proof_of_store::{LogicalTime, ProofOfStore, SignedDigestInfo}, + proof_of_store::{BatchId, LogicalTime, ProofOfStore, SignedDigestInfo}, request_response::{GetPayloadCommand, GetPayloadResponse}, }; use aptos_crypto::HashValue; use aptos_types::{aggregate_signature::AggregateSignature, PeerId}; use futures::channel::oneshot; +use move_core_types::account_address::AccountAddress; use std::collections::HashSet; #[tokio::test] async fn test_block_request() { - let mut proof_manager = ProofManager::new(0, 10, 10); + let mut proof_manager = ProofManager::new(0, AccountAddress::random(), 10, 10); let digest = HashValue::random(); + let batch_id = BatchId::new_for_test(1); let proof = ProofOfStore::new( - SignedDigestInfo::new(PeerId::random(), digest, LogicalTime::new(0, 10), 1, 1), + SignedDigestInfo::new( + PeerId::random(), + batch_id, + digest, + LogicalTime::new(0, 10), + 1, + 1, + ), AggregateSignature::empty(), ); - proof_manager.handle_remote_proof(proof.clone()); + proof_manager.receive_proof(proof.clone()); let (callback_tx, callback_rx) = oneshot::channel(); let req = GetPayloadCommand::GetPayloadRequest( diff --git a/consensus/src/quorum_store/tests/quorum_store_db_test.rs b/consensus/src/quorum_store/tests/quorum_store_db_test.rs index 12e9fed37d4b4..882c0bedb692f 100644 --- a/consensus/src/quorum_store/tests/quorum_store_db_test.rs +++ b/consensus/src/quorum_store/tests/quorum_store_db_test.rs @@ -4,9 +4,9 @@ use crate::quorum_store::{ quorum_store_db::{QuorumStoreDB, QuorumStoreStorage}, tests::utils::{compute_digest_from_signed_transaction, create_vec_signed_transactions}, - types::{BatchId, PersistedValue}, + types::PersistedValue, }; -use aptos_consensus_types::proof_of_store::LogicalTime; +use aptos_consensus_types::proof_of_store::{BatchId, LogicalTime}; use aptos_temppath::TempPath; use aptos_types::account_address::AccountAddress; @@ -18,7 +18,13 @@ fn test_db_for_data() { let source = AccountAddress::random(); let signed_txns = create_vec_signed_transactions(100); let digest_1 = compute_digest_from_signed_transaction(signed_txns.clone()); - let value_1 = PersistedValue::new(Some(signed_txns), LogicalTime::new(1, 20), source, 1000); + let value_1 = PersistedValue::new( + Some(signed_txns), + LogicalTime::new(1, 20), + source, + BatchId::new_for_test(1), + 1000, + ); assert!(db.save_batch(digest_1, value_1.clone()).is_ok()); assert_eq!( @@ -30,12 +36,24 @@ fn test_db_for_data() { let signed_txns = create_vec_signed_transactions(200); let digest_2 = compute_digest_from_signed_transaction(signed_txns.clone()); - let value_2 = PersistedValue::new(Some(signed_txns), LogicalTime::new(1, 20), source, 1000); + let value_2 = PersistedValue::new( + Some(signed_txns), + LogicalTime::new(1, 20), + source, + BatchId::new_for_test(1), + 1000, + ); assert!(db.save_batch(digest_2, value_2).is_ok()); let signed_txns = create_vec_signed_transactions(300); let digest_3 = compute_digest_from_signed_transaction(signed_txns.clone()); - let value_3 = PersistedValue::new(Some(signed_txns), LogicalTime::new(1, 20), source, 1000); + let value_3 = PersistedValue::new( + Some(signed_txns), + LogicalTime::new(1, 20), + source, + BatchId::new_for_test(1), + 1000, + ); assert!(db.save_batch(digest_3, value_3).is_ok()); let batches = vec![digest_3]; diff --git a/consensus/src/quorum_store/tests/types_test.rs b/consensus/src/quorum_store/tests/types_test.rs index 439e7537f3c30..c67d3fc654538 100644 --- a/consensus/src/quorum_store/tests/types_test.rs +++ b/consensus/src/quorum_store/tests/types_test.rs @@ -3,9 +3,9 @@ use crate::quorum_store::{ tests::utils::create_vec_signed_transactions, - types::{Batch, BatchId, BatchRequest, Fragment, SerializedTransaction}, + types::{Batch, BatchRequest, Fragment, SerializedTransaction}, }; -use aptos_consensus_types::proof_of_store::LogicalTime; +use aptos_consensus_types::proof_of_store::{BatchId, LogicalTime}; use aptos_crypto::hash::DefaultHasher; use aptos_types::account_address::AccountAddress; use bcs::{from_bytes, to_bytes}; diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs index e096195370f3a..dd103141b15fe 100644 --- a/consensus/src/quorum_store/types.rs +++ b/consensus/src/quorum_store/types.rs @@ -1,64 +1,13 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use aptos_consensus_types::proof_of_store::LogicalTime; +use aptos_consensus_types::proof_of_store::{BatchId, LogicalTime}; use aptos_crypto::{hash::DefaultHasher, HashValue}; use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher}; use aptos_types::{transaction::SignedTransaction, PeerId}; use bcs::to_bytes; use serde::{Deserialize, Serialize}; -use std::{ - cmp::Ordering, - fmt::{Display, Formatter}, - mem, -}; - -#[derive( - Copy, Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash, CryptoHasher, BCSCryptoHash, -)] -pub struct BatchId { - pub id: u64, - /// A random number that is stored in the DB and updated only if the value does not exist in - /// the DB: (a) at the start of an epoch, or (b) the DB was wiped. When the nonce is updated, - /// id starts again at 0. - pub nonce: u64, -} - -impl BatchId { - pub fn new(nonce: u64) -> Self { - Self { id: 0, nonce } - } - - pub fn new_for_test(id: u64) -> Self { - Self { id, nonce: 0 } - } - - pub fn increment(&mut self) { - self.id += 1; - } -} - -impl PartialOrd for BatchId { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for BatchId { - fn cmp(&self, other: &Self) -> Ordering { - match self.id.cmp(&other.id) { - Ordering::Equal => {}, - ordering => return ordering, - } - self.nonce.cmp(&other.nonce) - } -} - -impl Display for BatchId { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "({}, {})", self.id, self.nonce) - } -} +use std::mem; #[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct SerializedTransaction { @@ -97,6 +46,7 @@ pub struct PersistedValue { pub maybe_payload: Option>, pub expiration: LogicalTime, pub author: PeerId, + pub batch_id: BatchId, pub num_bytes: usize, } @@ -105,12 +55,14 @@ impl PersistedValue { maybe_payload: Option>, expiration: LogicalTime, author: PeerId, + batch_id: BatchId, num_bytes: usize, ) -> Self { Self { maybe_payload, expiration, author, + batch_id, num_bytes, } } diff --git a/consensus/src/quorum_store/utils.rs b/consensus/src/quorum_store/utils.rs index 9779d89dfc3d3..568b7056c0634 100644 --- a/consensus/src/quorum_store/utils.rs +++ b/consensus/src/quorum_store/utils.rs @@ -3,14 +3,11 @@ use crate::{ monitor, - quorum_store::{ - counters, - types::{BatchId, SerializedTransaction}, - }, + quorum_store::{counters, types::SerializedTransaction}, }; use aptos_consensus_types::{ common::{Round, TransactionSummary}, - proof_of_store::{LogicalTime, ProofOfStore}, + proof_of_store::{BatchId, LogicalTime, ProofOfStore}, }; use aptos_crypto::HashValue; use aptos_logger::prelude::*; @@ -32,6 +29,7 @@ use tokio::time::timeout; pub(crate) struct BatchBuilder { id: BatchId, + fragment_id: usize, summaries: Vec, data: Vec, num_txns: usize, @@ -44,6 +42,7 @@ impl BatchBuilder { pub(crate) fn new(batch_id: BatchId, max_bytes: usize) -> Self { Self { id: batch_id, + fragment_id: 0, summaries: Vec::new(), data: Vec::new(), num_txns: 0, @@ -52,29 +51,30 @@ impl BatchBuilder { } } - pub(crate) fn append_transaction( + pub(crate) fn append_transactions( &mut self, - txn: &SignedTransaction, + txns: &[SignedTransaction], max_txns_override: usize, ) -> bool { - let serialized_txn = SerializedTransaction::from_signed_txn(txn); - - if self.num_bytes + serialized_txn.len() <= self.max_bytes - && self.num_txns < max_txns_override - { - self.num_txns += 1; - self.num_bytes += serialized_txn.len(); + for txn in txns { + let serialized_txn = SerializedTransaction::from_signed_txn(txn); - self.summaries.push(TransactionSummary { - sender: txn.sender(), - sequence_number: txn.sequence_number(), - }); - self.data.push(serialized_txn); - - self.num_txns < max_txns_override && self.num_bytes < self.max_bytes - } else { - false + if self.num_bytes + serialized_txn.len() <= self.max_bytes + && self.num_txns < max_txns_override + { + self.num_txns += 1; + self.num_bytes += serialized_txn.len(); + + self.summaries.push(TransactionSummary { + sender: txn.sender(), + sequence_number: txn.sequence_number(), + }); + self.data.push(serialized_txn); + } else { + return false; + } } + true } pub(crate) fn take_serialized_txns(&mut self) -> Vec { @@ -89,11 +89,19 @@ impl BatchBuilder { self.id } + pub(crate) fn fetch_and_increment_fragment_id(&mut self) -> usize { + let id = self.fragment_id; + self.fragment_id += 1; + id + } + /// Clears the state, increments (batch) id. pub(crate) fn take_summaries(&mut self) -> Vec { assert!(self.data.is_empty()); + counters::NUM_FRAGMENT_PER_BATCH.observe((self.fragment_id + 1) as f64); self.id.increment(); + self.fragment_id = 0; self.num_bytes = 0; self.num_txns = 0; mem::take(&mut self.summaries) @@ -104,23 +112,23 @@ impl BatchBuilder { } } -pub(crate) struct DigestTimeouts { - timeouts: VecDeque<(i64, HashValue)>, +pub(crate) struct Timeouts { + timeouts: VecDeque<(i64, T)>, } -impl DigestTimeouts { +impl Timeouts { pub(crate) fn new() -> Self { Self { timeouts: VecDeque::new(), } } - pub(crate) fn add_digest(&mut self, digest: HashValue, timeout: usize) { + pub(crate) fn add(&mut self, value: T, timeout: usize) { let expiry = Utc::now().naive_utc().timestamp_millis() + timeout as i64; - self.timeouts.push_back((expiry, digest)); + self.timeouts.push_back((expiry, value)); } - pub(crate) fn expire(&mut self) -> Vec { + pub(crate) fn expire(&mut self) -> Vec { let cur_time = chrono::Utc::now().naive_utc().timestamp_millis(); trace!( "QS: expire cur time {} timeouts len {}", @@ -260,8 +268,11 @@ impl ProofQueue { }, } if local { + counters::LOCAL_POS_COUNT.inc(); self.local_digest_queue .push_back((*proof.digest(), proof.expiration())); + } else { + counters::REMOTE_POS_COUNT.inc(); } } diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index 35c49d343dc90..1144b3b60b1f4 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -119,7 +119,7 @@ impl UnverifiedEvent { }, UnverifiedEvent::SignedDigestMsg(sd) => { if !self_message { - sd.verify(validator)?; + sd.verify(peer_id, validator)?; } VerifiedEvent::SignedDigestMsg(sd) }, diff --git a/consensus/src/test_utils/mock_quorum_store_sender.rs b/consensus/src/test_utils/mock_quorum_store_sender.rs index b0831999e87aa..7d29e8534de1d 100644 --- a/consensus/src/test_utils/mock_quorum_store_sender.rs +++ b/consensus/src/test_utils/mock_quorum_store_sender.rs @@ -19,8 +19,8 @@ pub struct MockQuorumStoreSender { } impl MockQuorumStoreSender { - pub fn new(rx: Sender<(ConsensusMsg, Vec)>) -> Self { - Self { tx: rx } + pub fn new(tx: Sender<(ConsensusMsg, Vec)>) -> Self { + Self { tx } } } @@ -63,7 +63,13 @@ impl QuorumStoreSender for MockQuorumStoreSender { unimplemented!() } - async fn broadcast_proof_of_store(&mut self, _proof_of_store: ProofOfStore) { - unimplemented!() + async fn broadcast_proof_of_store(&mut self, proof_of_store: ProofOfStore) { + self.tx + .send(( + ConsensusMsg::ProofOfStoreMsg(Box::new(proof_of_store)), + vec![], + )) + .await + .unwrap(); } } diff --git a/testsuite/generate-format/tests/staged/consensus.yaml b/testsuite/generate-format/tests/staged/consensus.yaml index 692fd16512a02..059456697a4c7 100644 --- a/testsuite/generate-format/tests/staged/consensus.yaml +++ b/testsuite/generate-format/tests/staged/consensus.yaml @@ -437,6 +437,8 @@ SignedDigestInfo: STRUCT: - batch_author: TYPENAME: AccountAddress + - batch_id: + TYPENAME: BatchId - digest: TYPENAME: HashValue - expiration: