diff --git a/.github/workflows/lint-test.yaml b/.github/workflows/lint-test.yaml index 294e1bdfa299d..abfedabdedd7c 100644 --- a/.github/workflows/lint-test.yaml +++ b/.github/workflows/lint-test.yaml @@ -145,6 +145,8 @@ jobs: with: tool: nextest - run: cargo nextest run --locked --workspace --exclude smoke-test --exclude aptos-testcases --exclude aptos-api --exclude aptos-executor-benchmark --exclude aptos-backup-cli --retries 3 --no-fail-fast -F consensus-only-perf-test + env: + RUST_MIN_STACK: 4297152 rust-smoke-test: runs-on: high-perf-docker diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index dc3056f9e92a1..d4f2fa603dcd7 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -11,7 +11,6 @@ use aptos_types::{ }; use rand::{seq::SliceRandom, thread_rng}; use serde::{Deserialize, Serialize}; -use std::sync::Arc; #[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)] pub struct LogicalTime { @@ -78,7 +77,7 @@ impl SignedDigest { expiration: LogicalTime, num_txns: u64, num_bytes: u64, - validator_signer: Arc, + validator_signer: &ValidatorSigner, ) -> Result { let info = SignedDigestInfo::new(batch_author, digest, expiration, num_txns, num_bytes); let signature = validator_signer.sign(&info)?; diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index bea8fd95397f1..832f89957e92b 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -634,17 +634,17 @@ pub static QUORUM_STORE_CHANNEL_MSGS: Lazy = Lazy::new(|| { .unwrap() }); -/// Counters(queued,dequeued,dropped) related to block retrieval channel -pub static BLOCK_RETRIEVAL_CHANNEL_MSGS: Lazy = Lazy::new(|| { +/// Counters(queued,dequeued,dropped) related to rpc request channel +pub static RPC_CHANNEL_MSGS: Lazy = Lazy::new(|| { register_int_counter_vec!( - "aptos_consensus_block_retrieval_channel_msgs_count", - "Counters(queued,dequeued,dropped) related to block retrieval channel", + "aptos_consensus_rpc_channel_msgs_count", + "Counters(queued,dequeued,dropped) related to rpc request channel", &["state"] ) .unwrap() }); -/// Counters(queued,dequeued,dropped) related to block retrieval task +/// Counters(queued,dequeued,dropped) related to block retrieval per epoch task pub static BLOCK_RETRIEVAL_TASK_MSGS: Lazy = Lazy::new(|| { register_int_counter_vec!( "aptos_consensus_block_retrieval_task_msgs_count", diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 6755f192c3db3..36888cc24e0e7 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -29,7 +29,10 @@ use crate::{ logging::{LogEvent, LogSchema}, metrics_safety_rules::MetricsSafetyRules, monitor, - network::{IncomingBlockRetrievalRequest, NetworkReceivers, NetworkSender}, + network::{ + IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest, IncomingRpcRequest, + NetworkReceivers, NetworkSender, + }, network_interface::{ConsensusMsg, ConsensusNetworkClient}, payload_client::QuorumStoreClient, persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData}, @@ -127,6 +130,8 @@ pub struct EpochManager { quorum_store_msg_tx: Option>, quorum_store_coordinator_tx: Option>, quorum_store_storage: Arc, + batch_retrieval_tx: + Option>, } impl EpochManager { @@ -169,6 +174,7 @@ impl EpochManager { quorum_store_msg_tx: None, quorum_store_coordinator_tx: None, quorum_store_storage, + batch_retrieval_tx: None, } } @@ -539,6 +545,7 @@ impl EpochManager { // Shutdown the block retrieval task by dropping the sender self.block_retrieval_tx = None; + self.batch_retrieval_tx = None; if let Some(mut quorum_store_coordinator_tx) = self.quorum_store_coordinator_tx.take() { let (ack_tx, ack_rx) = oneshot::channel(); @@ -693,7 +700,12 @@ impl EpochManager { payload_manager.clone(), )); - self.quorum_store_coordinator_tx = quorum_store_builder.start(); + if let Some((quorum_store_coordinator_tx, batch_retrieval_rx)) = + quorum_store_builder.start() + { + self.quorum_store_coordinator_tx = Some(quorum_store_coordinator_tx); + self.batch_retrieval_tx = Some(batch_retrieval_rx); + } info!(epoch = epoch, "Create ProposalGenerator"); // txn manager is required both by proposal generator (to pull the proposers) @@ -912,8 +924,6 @@ impl EpochManager { ) -> anyhow::Result<()> { match event { UnverifiedEvent::FragmentMsg(_) - | UnverifiedEvent::BatchRequestMsg(_) - | UnverifiedEvent::BatchMsg(_) | UnverifiedEvent::SignedDigestMsg(_) | UnverifiedEvent::ProofOfStoreMsg(_) => { if self.quorum_store_enabled { @@ -941,9 +951,7 @@ impl EpochManager { ); } match event { - quorum_store_event @ (VerifiedEvent::BatchRequestMsg(_) - | VerifiedEvent::UnverifiedBatchMsg(_) - | VerifiedEvent::SignedDigestMsg(_) + quorum_store_event @ (VerifiedEvent::SignedDigestMsg(_) | VerifiedEvent::ProofOfStoreMsg(_) | VerifiedEvent::FragmentMsg(_)) => { if let Some(sender) = &mut self.quorum_store_msg_tx { @@ -975,18 +983,29 @@ impl EpochManager { } } - fn process_block_retrieval( + fn process_rpc_request( &self, peer_id: Author, - request: IncomingBlockRetrievalRequest, + request: IncomingRpcRequest, ) -> anyhow::Result<()> { fail_point!("consensus::process::any", |_| { - Err(anyhow::anyhow!("Injected error in process_block_retrieval")) + Err(anyhow::anyhow!("Injected error in process_rpc_request")) }); - if let Some(tx) = &self.block_retrieval_tx { - tx.push(peer_id, request) - } else { - Err(anyhow::anyhow!("Round manager not started")) + match request { + IncomingRpcRequest::BlockRetrieval(request) => { + if let Some(tx) = &self.block_retrieval_tx { + tx.push(peer_id, request) + } else { + Err(anyhow::anyhow!("Round manager not started")) + } + }, + IncomingRpcRequest::BatchRetrieval(request) => { + if let Some(tx) = &self.batch_retrieval_tx { + tx.push(peer_id, request) + } else { + Err(anyhow::anyhow!("Quorum store not started")) + } + }, } } @@ -1023,8 +1042,8 @@ impl EpochManager { error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e)); } }, - (peer, request) = network_receivers.block_retrieval.select_next_some() => { - if let Err(e) = self.process_block_retrieval(peer, request) { + (peer, request) = network_receivers.rpc_rx.select_next_some() => { + if let Err(e) = self.process_rpc_request(peer, request) { error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e)); } }, diff --git a/consensus/src/logging.rs b/consensus/src/logging.rs index 5af644fda6d15..30e02ff5526c7 100644 --- a/consensus/src/logging.rs +++ b/consensus/src/logging.rs @@ -22,6 +22,7 @@ pub enum LogEvent { NewEpoch, NewRound, Propose, + ReceiveBatchRetrieval, ReceiveBlockRetrieval, ReceiveEpochChangeProof, ReceiveEpochRetrieval, diff --git a/consensus/src/network.rs b/consensus/src/network.rs index f26d95465da6c..aa2069f0ea887 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -53,6 +53,19 @@ pub struct IncomingBlockRetrievalRequest { pub response_sender: oneshot::Sender>, } +#[derive(Debug)] +pub struct IncomingBatchRetrievalRequest { + pub req: BatchRequest, + pub protocol: ProtocolId, + pub response_sender: oneshot::Sender>, +} + +#[derive(Debug)] +pub enum IncomingRpcRequest { + BlockRetrieval(IncomingBlockRetrievalRequest), + BatchRetrieval(IncomingBatchRetrievalRequest), +} + /// Just a convenience struct to keep all the network proxy receiving queues in one place. /// Will be returned by the NetworkTask upon startup. pub struct NetworkReceivers { @@ -65,14 +78,20 @@ pub struct NetworkReceivers { (AccountAddress, Discriminant), (AccountAddress, ConsensusMsg), >, - pub block_retrieval: - aptos_channel::Receiver, + pub rpc_rx: aptos_channel::Receiver, } #[async_trait::async_trait] -pub(crate) trait QuorumStoreSender { +pub trait QuorumStoreSender: Send + Clone { async fn send_batch_request(&self, request: BatchRequest, recipients: Vec); + async fn request_batch( + &self, + request: BatchRequest, + recipient: Author, + timeout: Duration, + ) -> anyhow::Result; + async fn send_batch(&self, batch: Batch, recipients: Vec); async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec); @@ -311,6 +330,23 @@ impl QuorumStoreSender for NetworkSender { self.send(msg, recipients).await } + async fn request_batch( + &self, + request: BatchRequest, + recipient: Author, + timeout: Duration, + ) -> anyhow::Result { + let msg = ConsensusMsg::BatchRequestMsg(Box::new(request)); + let response = self + .consensus_network_client + .send_rpc(recipient, msg, timeout) + .await?; + match response { + ConsensusMsg::BatchMsg(batch) => Ok(*batch), + _ => Err(anyhow!("Invalid batch response")), + } + } + async fn send_batch(&self, batch: Batch, recipients: Vec) { fail_point!("consensus::send::batch", |_| ()); let msg = ConsensusMsg::BatchMsg(Box::new(batch)); @@ -345,8 +381,7 @@ pub struct NetworkTask { (AccountAddress, Discriminant), (AccountAddress, ConsensusMsg), >, - block_retrieval_tx: - aptos_channel::Sender, + rpc_tx: aptos_channel::Sender, all_events: Box> + Send + Unpin>, } @@ -364,11 +399,8 @@ impl NetworkTask { 50, Some(&counters::QUORUM_STORE_CHANNEL_MSGS), ); - let (block_retrieval_tx, block_retrieval) = aptos_channel::new( - QueueStyle::LIFO, - 1, - Some(&counters::BLOCK_RETRIEVAL_CHANNEL_MSGS), - ); + let (rpc_tx, rpc_rx) = + aptos_channel::new(QueueStyle::LIFO, 1, Some(&counters::RPC_CHANNEL_MSGS)); // Verify the network events have been constructed correctly let network_and_events = network_service_events.into_network_and_events(); @@ -387,13 +419,13 @@ impl NetworkTask { NetworkTask { consensus_messages_tx, quorum_store_messages_tx, - block_retrieval_tx, + rpc_tx, all_events, }, NetworkReceivers { consensus_messages, quorum_store_messages, - block_retrieval, + rpc_rx, }, ) } @@ -422,10 +454,11 @@ impl NetworkTask { .with_label_values(&[msg.name()]) .inc(); match msg { + ConsensusMsg::BatchRequestMsg(_) | ConsensusMsg::BatchMsg(_) => { + warn!("unexpected msg"); + }, quorum_store_msg @ (ConsensusMsg::SignedDigestMsg(_) | ConsensusMsg::FragmentMsg(_) - | ConsensusMsg::BatchRequestMsg(_) - | ConsensusMsg::BatchMsg(_) | ConsensusMsg::ProofOfStoreMsg(_)) => { Self::push_msg( peer_id, @@ -463,15 +496,33 @@ impl NetworkTask { ); continue; } - let req_with_callback = IncomingBlockRetrievalRequest { - req: *request, - protocol, - response_sender: callback, - }; - if let Err(e) = self - .block_retrieval_tx - .push(peer_id, (peer_id, req_with_callback)) - { + let req_with_callback = + IncomingRpcRequest::BlockRetrieval(IncomingBlockRetrievalRequest { + req: *request, + protocol, + response_sender: callback, + }); + if let Err(e) = self.rpc_tx.push(peer_id, (peer_id, req_with_callback)) { + warn!(error = ?e, "aptos channel closed"); + } + }, + ConsensusMsg::BatchRequestMsg(request) => { + counters::CONSENSUS_RECEIVED_MSGS + .with_label_values(&["BatchRetrievalRequest"]) + .inc(); + debug!( + remote_peer = peer_id, + event = LogEvent::ReceiveBatchRetrieval, + "{:?}", + request + ); + let req_with_callback = + IncomingRpcRequest::BatchRetrieval(IncomingBatchRetrievalRequest { + req: *request, + protocol, + response_sender: callback, + }); + if let Err(e) = self.rpc_tx.push(peer_id, (peer_id, req_with_callback)) { warn!(error = ?e, "aptos channel closed"); } }, diff --git a/consensus/src/network_tests.rs b/consensus/src/network_tests.rs index adc5215d54df2..7f15149f55bda 100644 --- a/consensus/src/network_tests.rs +++ b/consensus/src/network_tests.rs @@ -477,7 +477,7 @@ impl DropConfigRound { mod tests { use super::*; use crate::{ - network::NetworkTask, + network::{IncomingRpcRequest, NetworkTask}, network_interface::{DIRECT_SEND, RPC}, }; use aptos_config::network_id::{NetworkId, PeerNetworkId}; @@ -749,9 +749,9 @@ mod tests { ); // verify request block rpc - let mut block_retrieval = receiver_1.block_retrieval; + let mut rpc_rx = receiver_1.rpc_rx; let on_request_block = async move { - while let Some((_, request)) = block_retrieval.next().await { + while let Some((_, request)) = rpc_rx.next().await { // make sure the network task is not blocked during RPC // we limit the network notification queue size to 1 so if it's blocked, // we can not process 2 votes and the test will timeout @@ -764,7 +764,12 @@ mod tests { BlockRetrievalResponse::new(BlockRetrievalStatus::IdNotFound, vec![]); let response = ConsensusMsg::BlockRetrievalResponse(Box::new(response)); let bytes = Bytes::from(serde_json::to_vec(&response).unwrap()); - request.response_sender.send(Ok(bytes)).unwrap(); + match request { + IncomingRpcRequest::BlockRetrieval(request) => { + request.response_sender.send(Ok(bytes)).unwrap() + }, + _ => panic!("unexpected message"), + } } }; runtime.handle().spawn(on_request_block); @@ -824,13 +829,13 @@ mod tests { .unwrap(); let f_check = async move { - assert!(network_receivers.block_retrieval.next().await.is_some()); + assert!(network_receivers.rpc_rx.next().await.is_some()); drop(peer_mgr_notifs_tx); drop(connection_notifs_tx); drop(self_sender); - assert!(network_receivers.block_retrieval.next().await.is_none()); + assert!(network_receivers.rpc_rx.next().await.is_none()); assert!(network_receivers.consensus_messages.next().await.is_none()); }; let f_network_task = network_task.start(); diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs index d50b19ab900c8..3483263b372c2 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager.rs @@ -1,8 +1,9 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::quorum_store::{ - batch_reader::BatchReader, quorum_store_coordinator::CoordinatorCommand, +use crate::{ + network::NetworkSender, + quorum_store::{batch_store::BatchStore, quorum_store_coordinator::CoordinatorCommand}, }; use aptos_consensus_types::{ block::Block, @@ -22,14 +23,17 @@ use tokio::sync::oneshot; /// If QuorumStore is enabled, has to ask BatchReader for the transaction behind the proofs of availability in the payload. pub enum PayloadManager { DirectMempool, - InQuorumStore(Arc, Mutex>), + InQuorumStore( + Arc>, + Mutex>, + ), } impl PayloadManager { async fn request_transactions( proofs: Vec, logical_time: LogicalTime, - batch_reader: &BatchReader, + batch_store: &BatchStore, ) -> Vec<( HashValue, oneshot::Receiver, aptos_executor_types::Error>>, @@ -43,7 +47,7 @@ impl PayloadManager { logical_time ); if logical_time <= pos.expiration() { - receivers.push((*pos.digest(), batch_reader.get_batch(pos).await)); + receivers.push((*pos.digest(), batch_store.get_batch(pos))); } else { debug!("QS: skipped expired pos"); } @@ -55,8 +59,8 @@ impl PayloadManager { pub async fn notify_commit(&self, logical_time: LogicalTime, payloads: Vec) { match self { PayloadManager::DirectMempool => {}, - PayloadManager::InQuorumStore(batch_reader, coordinator_tx) => { - batch_reader.update_certified_round(logical_time).await; + PayloadManager::InQuorumStore(batch_store, coordinator_tx) => { + batch_store.update_certified_round(logical_time).await; let digests: Vec = payloads .into_iter() @@ -96,13 +100,13 @@ impl PayloadManager { }; match self { PayloadManager::DirectMempool => {}, - PayloadManager::InQuorumStore(batch_reader, _) => match payload { + PayloadManager::InQuorumStore(batch_store, _) => match payload { Payload::InQuorumStore(proof_with_status) => { if proof_with_status.status.lock().is_none() { let receivers = PayloadManager::request_transactions( proof_with_status.proofs.clone(), LogicalTime::new(block.epoch(), block.round()), - batch_reader, + batch_store, ) .await; proof_with_status @@ -129,7 +133,7 @@ impl PayloadManager { match (self, payload) { (PayloadManager::DirectMempool, Payload::DirectMempool(txns)) => Ok(txns.clone()), ( - PayloadManager::InQuorumStore(batch_reader, _), + PayloadManager::InQuorumStore(batch_store, _), Payload::InQuorumStore(proof_with_data), ) => { let status = proof_with_data.status.lock().take(); @@ -152,7 +156,7 @@ impl PayloadManager { let new_receivers = PayloadManager::request_transactions( proof_with_data.proofs.clone(), LogicalTime::new(block.epoch(), block.round()), - batch_reader, + batch_store, ) .await; // Could not get all data so requested again @@ -169,7 +173,7 @@ impl PayloadManager { let new_receivers = PayloadManager::request_transactions( proof_with_data.proofs.clone(), LogicalTime::new(block.epoch(), block.round()), - batch_reader, + batch_store, ) .await; // Could not get all data so requested again diff --git a/consensus/src/quorum_store/batch_coordinator.rs b/consensus/src/quorum_store/batch_coordinator.rs index b354d2cc044b3..9b040084865e7 100644 --- a/consensus/src/quorum_store/batch_coordinator.rs +++ b/consensus/src/quorum_store/batch_coordinator.rs @@ -5,7 +5,7 @@ use crate::{ network::{NetworkSender, QuorumStoreSender}, quorum_store::{ batch_aggregator::BatchAggregator, - batch_store::{BatchStoreCommand, PersistRequest}, + batch_store::{BatchStore, PersistRequest}, counters, proof_coordinator::{ProofCoordinatorCommand, ProofReturnChannel}, types::{BatchId, Fragment, SerializedTransaction}, @@ -14,7 +14,7 @@ use crate::{ use aptos_consensus_types::proof_of_store::{LogicalTime, SignedDigestInfo}; use aptos_logger::prelude::*; use aptos_types::PeerId; -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use tokio::sync::{ mpsc::{Receiver, Sender}, oneshot, @@ -38,7 +38,7 @@ pub struct BatchCoordinator { my_peer_id: PeerId, network_sender: NetworkSender, command_rx: Receiver, - batch_store_tx: Sender, + batch_store: Arc>, proof_coordinator_tx: Sender, max_batch_bytes: usize, remote_batch_aggregators: HashMap, @@ -52,7 +52,7 @@ impl BatchCoordinator { my_peer_id: PeerId, network_sender: NetworkSender, wrapper_command_rx: Receiver, - batch_store_tx: Sender, + batch_store: Arc>, proof_coordinator_tx: Sender, max_batch_bytes: usize, ) -> Self { @@ -61,7 +61,7 @@ impl BatchCoordinator { my_peer_id, network_sender, command_rx: wrapper_command_rx, - batch_store_tx, + batch_store, proof_coordinator_tx, max_batch_bytes, remote_batch_aggregators: HashMap::new(), @@ -105,7 +105,7 @@ impl BatchCoordinator { batch_id: BatchId, expiration: LogicalTime, proof_tx: ProofReturnChannel, - ) -> (BatchStoreCommand, Fragment) { + ) -> (PersistRequest, Fragment) { match self.local_batch_aggregator.end_batch( batch_id, self.local_fragment_id, @@ -143,7 +143,7 @@ impl BatchCoordinator { num_bytes, expiration, ); - (BatchStoreCommand::Persist(persist_request), fragment) + (persist_request, fragment) }, Err(e) => { unreachable!( @@ -154,7 +154,7 @@ impl BatchCoordinator { } } - async fn handle_fragment(&mut self, fragment: Fragment) { + async fn handle_fragment(&mut self, fragment: Fragment) -> Option { let source = fragment.source(); let entry = self .remote_batch_aggregators @@ -176,13 +176,9 @@ impl BatchCoordinator { fragment.into_transactions(), ) { Ok((num_bytes, payload, digest)) => { - let persist_cmd = BatchStoreCommand::Persist(PersistRequest::new( - source, payload, digest, num_bytes, expiration, - )); - self.batch_store_tx - .send(persist_cmd) - .await - .expect("BatchStore receiver not available"); + let persist_request = + PersistRequest::new(source, payload, digest, num_bytes, expiration); + return Some(persist_request); }, Err(e) => { debug!("Could not append batch from {:?}, error {:?}", source, e); @@ -204,6 +200,24 @@ impl BatchCoordinator { ) { debug!("Could not append batch from {:?}, error {:?}", source, e); } + None + } + + fn persist_and_send_digest(&self, persist_request: PersistRequest) { + let batch_store = self.batch_store.clone(); + let network_sender = self.network_sender.clone(); + let my_peer_id = self.my_peer_id; + tokio::spawn(async move { + let peer_id = persist_request.value.author; + if let Some(signed_digest) = batch_store.persist(persist_request) { + if my_peer_id != peer_id { + counters::DELIVERED_BATCHES_COUNT.inc(); + } + network_sender + .send_signed_digest(signed_digest, vec![peer_id]) + .await; + } + }); } pub(crate) async fn start(mut self) { @@ -229,23 +243,21 @@ impl BatchCoordinator { proof_tx, ) => { debug!("QS: end batch cmd received, batch id = {}", batch_id); - let (batch_store_command, fragment) = self + let (persist_request, fragment) = self .handle_end_batch(fragment_payload, batch_id, logical_time, proof_tx) .await; self.network_sender.broadcast_fragment(fragment).await; - - self.batch_store_tx - .send(batch_store_command) - .await - .expect("Failed to send to BatchStore"); + self.persist_and_send_digest(persist_request); counters::NUM_FRAGMENT_PER_BATCH.observe((self.local_fragment_id + 1) as f64); self.local_fragment_id = 0; }, BatchCoordinatorCommand::RemoteFragment(fragment) => { - self.handle_fragment(*fragment).await; + if let Some(persist_request) = self.handle_fragment(*fragment).await { + self.persist_and_send_digest(persist_request); + } }, } } diff --git a/consensus/src/quorum_store/batch_reader.rs b/consensus/src/quorum_store/batch_reader.rs deleted file mode 100644 index d06fe11b8693d..0000000000000 --- a/consensus/src/quorum_store/batch_reader.rs +++ /dev/null @@ -1,491 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::{ - network::QuorumStoreSender, - quorum_store::{ - batch_requester::BatchRequester, - batch_store::BatchStoreCommand, - counters, - types::{Batch, PersistedValue}, - utils::RoundExpirations, - }, -}; -use anyhow::bail; -use aptos_consensus_types::{ - common::Round, - proof_of_store::{LogicalTime, ProofOfStore}, -}; -use aptos_crypto::HashValue; -use aptos_executor_types::Error; -use aptos_logger::debug; -use aptos_types::{transaction::SignedTransaction, validator_verifier::ValidatorVerifier, PeerId}; -use dashmap::{ - mapref::entry::Entry::{Occupied, Vacant}, - DashMap, -}; -use fail::fail_point; -use once_cell::sync::OnceCell; -use std::{ - collections::HashMap, - sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, Mutex, - }, - time::Duration, -}; -use tokio::{ - sync::{ - mpsc::{Receiver, Sender}, - oneshot, Notify, - }, - time, -}; - -#[derive(Debug)] -pub(crate) enum BatchReaderCommand { - GetBatchForPeer(HashValue, PeerId), - GetBatchForSelf( - ProofOfStore, - oneshot::Sender, Error>>, - ), - BatchResponse(HashValue, Vec), -} - -#[derive(PartialEq)] -enum StorageMode { - PersistedOnly, - MemoryAndPersisted, -} - -struct QuotaManager { - memory_balance: usize, - db_balance: usize, - memory_quota: usize, - db_quota: usize, -} - -impl QuotaManager { - fn new(max_db_balance: usize, max_memory_balance: usize) -> Self { - assert!(max_db_balance >= max_memory_balance); - Self { - memory_balance: 0, - db_balance: 0, - memory_quota: max_memory_balance, - db_quota: max_db_balance, - } - } - - pub(crate) fn update_quota(&mut self, num_bytes: usize) -> anyhow::Result { - if self.memory_balance + num_bytes <= self.memory_quota { - self.memory_balance += num_bytes; - self.db_balance += num_bytes; - Ok(StorageMode::MemoryAndPersisted) - } else if self.db_balance + num_bytes <= self.db_quota { - self.db_balance += num_bytes; - Ok(StorageMode::PersistedOnly) - } else { - counters::EXCEEDED_STORAGE_QUOTA_COUNT.inc(); - bail!("Storage quota exceeded "); - } - } - - pub(crate) fn free_quota(&mut self, num_bytes: usize, storage_mode: StorageMode) { - match storage_mode { - StorageMode::PersistedOnly => { - self.db_balance -= num_bytes; - }, - StorageMode::MemoryAndPersisted => { - self.memory_balance -= num_bytes; - self.db_balance -= num_bytes; - }, - } - } -} - -fn payload_storage_mode(persisted_value: &PersistedValue) -> StorageMode { - match persisted_value.maybe_payload { - Some(_) => StorageMode::MemoryAndPersisted, - None => StorageMode::PersistedOnly, - } -} - -/// Provides in memory representation of stored batches (strong cache), and allows -/// efficient concurrent readers. -pub struct BatchReader { - epoch: OnceCell, - my_peer_id: PeerId, - last_certified_round: AtomicU64, - db_cache: DashMap, - peer_quota: DashMap, - expirations: Mutex>, - batch_store_tx: Sender, - self_tx: Sender, - batch_expiry_round_gap_when_init: Round, - batch_expiry_round_gap_behind_latest_certified: Round, - batch_expiry_round_gap_beyond_latest_certified: Round, - expiry_grace_rounds: Round, - memory_quota: usize, - db_quota: usize, - shutdown_flag: AtomicBool, - shutdown_notify: Notify, -} - -impl BatchReader { - pub(crate) fn new( - epoch: u64, - last_certified_round: Round, - db_content: HashMap, - my_peer_id: PeerId, - batch_store_tx: Sender, - self_tx: Sender, - batch_expiry_round_gap_when_init: Round, - batch_expiry_round_gap_behind_latest_certified: Round, - batch_expiry_round_gap_beyond_latest_certified: Round, - expiry_grace_rounds: Round, - memory_quota: usize, - db_quota: usize, - ) -> (Arc, Vec) { - let self_ob = Self { - epoch: OnceCell::with_value(epoch), - my_peer_id, - last_certified_round: AtomicU64::new(last_certified_round), - db_cache: DashMap::new(), - peer_quota: DashMap::new(), - expirations: Mutex::new(RoundExpirations::new()), - batch_store_tx, - self_tx, - batch_expiry_round_gap_when_init, - batch_expiry_round_gap_behind_latest_certified, - batch_expiry_round_gap_beyond_latest_certified, - expiry_grace_rounds, - memory_quota, - db_quota, - shutdown_flag: AtomicBool::new(false), - shutdown_notify: Notify::new(), - }; - - let mut expired_keys = Vec::new(); - debug!( - "QS: Batchreader {} {} {}", - db_content.len(), - epoch, - last_certified_round - ); - for (digest, value) in db_content { - let expiration = value.expiration; - - debug!( - "QS: Batchreader recovery content exp {:?}, digest {}", - expiration, digest - ); - assert!(epoch >= expiration.epoch()); - - if epoch > expiration.epoch() - || last_certified_round >= expiration.round() + expiry_grace_rounds - { - expired_keys.push(digest); - } else { - self_ob - .update_cache(digest, value) - .expect("Storage limit exceeded upon BatchReader construction"); - } - } - - debug!( - "QS: Batchreader recovery expired keys len {}", - expired_keys.len() - ); - (Arc::new(self_ob), expired_keys) - } - - fn epoch(&self) -> u64 { - *self.epoch.get().unwrap() - } - - // Return an error if storage quota is exceeded. - fn update_cache(&self, digest: HashValue, mut value: PersistedValue) -> anyhow::Result<()> { - let author = value.author; - if self - .peer_quota - .entry(author) - .or_insert(QuotaManager::new(self.db_quota, self.memory_quota)) - .update_quota(value.num_bytes)? - == StorageMode::PersistedOnly - { - value.remove_payload(); - } - - let expiration_round = value.expiration.round(); - if let Some(prev_value) = self.db_cache.insert(digest, value) { - self.free_quota(prev_value); - } - self.expirations - .lock() - .unwrap() - .add_item(digest, expiration_round); - Ok(()) - } - - pub(crate) fn save(&self, digest: HashValue, value: PersistedValue) -> anyhow::Result { - if value.expiration.epoch() == self.epoch() { - // record the round gaps - if value.expiration.round() > self.last_certified_round() { - counters::GAP_BETWEEN_BATCH_EXPIRATION_AND_LAST_CERTIFIED_ROUND_HIGHER - .observe((value.expiration.round() - self.last_certified_round()) as f64); - } - if value.expiration.round() < self.last_certified_round() { - counters::GAP_BETWEEN_BATCH_EXPIRATION_AND_LAST_CERTIFIED_ROUND_LOWER - .observe((self.last_certified_round() - value.expiration.round()) as f64); - } - - if value.expiration.round() + self.batch_expiry_round_gap_behind_latest_certified - >= self.last_certified_round() - && value.expiration.round() - <= self.last_certified_round() - + self.batch_expiry_round_gap_beyond_latest_certified - { - fail_point!("quorum_store::save", |_| { - // Skip caching and storing value to the db - Ok(false) - }); - - if let Some(entry) = self.db_cache.get(&digest) { - if entry.expiration.round() >= value.expiration.round() { - debug!("QS: already have the digest with higher expiration"); - return Ok(false); - } - } - self.update_cache(digest, value)?; - return Ok(true); - } - } - bail!("Incorrect expiration {:?} with init gap {} in epoch {}, last committed round {} and max behind gap {} max beyond gap {}", - value.expiration, - self.batch_expiry_round_gap_when_init, - self.epoch(), - self.last_certified_round(), - self.batch_expiry_round_gap_behind_latest_certified, - self.batch_expiry_round_gap_beyond_latest_certified); - } - - pub async fn shutdown(&self) { - self.shutdown_flag.swap(true, Ordering::Relaxed); - self.shutdown_notify.notified().await; - } - - fn clear_expired_payload(&self, certified_time: LogicalTime) -> Vec { - assert_eq!( - certified_time.epoch(), - self.epoch(), - "Execution epoch inconsistent with BatchReader" - ); - - let expired_round = if certified_time.round() >= self.expiry_grace_rounds { - certified_time.round() - self.expiry_grace_rounds - } else { - 0 - }; - - let expired_digests = self.expirations.lock().unwrap().expire(expired_round); - let mut ret = Vec::new(); - for h in expired_digests { - let removed_value = match self.db_cache.entry(h) { - Occupied(entry) => { - // We need to check up-to-date expiration again because receiving the same - // digest with a higher expiration would update the persisted value and - // effectively extend the expiration. - if entry.get().expiration.round() <= expired_round { - Some(entry.remove()) - } else { - None - } - }, - Vacant(_) => unreachable!("Expired entry not in cache"), - }; - // No longer holding the lock on db_cache entry. - if let Some(value) = removed_value { - self.free_quota(value); - ret.push(h); - } - } - ret - } - - fn free_quota(&self, persisted_value: PersistedValue) { - let mut quota_manager = self - .peer_quota - .get_mut(&persisted_value.author) - .expect("No QuotaManager for batch author"); - quota_manager.free_quota( - persisted_value.num_bytes, - payload_storage_mode(&persisted_value), - ); - } - - // TODO: make sure state-sync also sends the message, or execution cleans. - // When self.expiry_grace_rounds == 0, certified time contains a round for - // which execution result has been certified by a quorum, and as such, the - // batches with expiration in this round can be cleaned up. The parameter - // expiry grace rounds just keeps the batches around for a little longer - // for lagging nodes to be able to catch up (without state-sync). - pub async fn update_certified_round(&self, certified_time: LogicalTime) { - debug!("QS: batch reader updating time {:?}", certified_time); - assert_eq!( - self.epoch(), - certified_time.epoch(), - "QS: wrong epoch {} != {}", - self.epoch(), - certified_time.epoch() - ); - - let prev_round = self - .last_certified_round - .fetch_max(certified_time.round(), Ordering::SeqCst); - // Note: prev_round may be equal to certified_time round due to state-sync - // at the epoch boundary. - assert!( - prev_round <= certified_time.round(), - "Decreasing executed rounds reported to BatchReader {} {}", - prev_round, - certified_time.round(), - ); - - let expired_keys = self.clear_expired_payload(certified_time); - if let Err(e) = self - .batch_store_tx - .send(BatchStoreCommand::Clean(expired_keys)) - .await - { - debug!("QS: Failed to send to BatchStore: {:?}", e); - } - } - - fn last_certified_round(&self) -> Round { - self.last_certified_round.load(Ordering::Relaxed) - } - - pub async fn get_batch( - &self, - proof: ProofOfStore, - ) -> oneshot::Receiver, Error>> { - let (tx, rx) = oneshot::channel(); - - if let Some(value) = self.db_cache.get(proof.digest()) { - if payload_storage_mode(&value) == StorageMode::PersistedOnly { - assert!( - value.maybe_payload.is_none(), - "BatchReader payload and storage kind mismatch" - ); - self.batch_store_tx - .send(BatchStoreCommand::BatchRequest( - *proof.digest(), - self.my_peer_id, - Some(tx), - )) - .await - .expect("Failed to send to BatchStore"); - } else { - // Available in memory. - if tx - .send(Ok(value - .maybe_payload - .clone() - .expect("BatchReader payload and storage kind mismatch"))) - .is_err() - { - debug!( - "Receiver of requested batch is not available for digest {}", - proof.digest() - ); - } - } - } else { - // Quorum store metrics - counters::MISSED_BATCHES_COUNT.inc(); - - self.self_tx - .send(BatchReaderCommand::GetBatchForSelf(proof, tx)) - .await - .expect("Batch Reader Receiver is not available"); - } - rx - } - - pub(crate) async fn start( - &self, - mut batch_reader_rx: Receiver, - network_sender: T, - request_num_peers: usize, - request_timeout_ms: usize, - verifier: ValidatorVerifier, - ) { - debug!( - "[QS worker] BatchReader worker for epoch {} starting", - self.epoch() - ); - - let mut batch_requester = BatchRequester::new( - self.epoch(), - self.my_peer_id, - request_num_peers, - request_timeout_ms, - network_sender.clone(), - ); - - let mut interval = time::interval(Duration::from_millis(100)); - - loop { - tokio::select! { - biased; - - _ = interval.tick() => { - batch_requester.handle_timeouts().await; - if self.shutdown_flag.load(Ordering::Relaxed) { - break; - } - }, - - Some(cmd) = batch_reader_rx.recv() => { - match cmd { - BatchReaderCommand::GetBatchForPeer(digest, peer_id) => { - if let Some(value) = self.db_cache.get(&digest) { - match payload_storage_mode(&value) { - StorageMode::PersistedOnly => { - assert!(value.maybe_payload.is_none(), "BatchReader payload and storage kind mismatch"); - if self.batch_store_tx.send(BatchStoreCommand::BatchRequest(digest, peer_id, None)).await.is_err() { - debug!("Failed to send request to BatchStore"); - } - }, - StorageMode::MemoryAndPersisted => { - let batch = Batch::new( - self.my_peer_id, - self.epoch(), - digest, - value.maybe_payload.clone().expect("BatchReader payload and storage kind mismatch"), - ); - network_sender.send_batch(batch, vec![peer_id]).await; - }, - } // TODO: consider returning Nack - } - }, - BatchReaderCommand::GetBatchForSelf(proof, ret_tx) => { - batch_requester - .add_request(*proof.digest(), proof.shuffled_signers(&verifier), ret_tx) - .await; - }, - BatchReaderCommand::BatchResponse(digest, payload) => { - batch_requester.serve_request(digest, payload); - }, - } - }, - } - } - - self.shutdown_notify.notify_one(); - debug!( - "[QS worker] BatchReader worker for epoch {} stopping", - self.epoch() - ); - } -} diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index 578458539d30d..fec2f0b963a41 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -3,14 +3,14 @@ use crate::{ network::QuorumStoreSender, - quorum_store::{counters, types::BatchRequest, utils::DigestTimeouts}, + quorum_store::{counters, types::BatchRequest}, }; -use aptos_crypto::{hash::DefaultHasher, HashValue}; +use aptos_crypto::HashValue; use aptos_executor_types::*; use aptos_logger::debug; use aptos_types::{transaction::SignedTransaction, PeerId}; -use bcs::to_bytes; -use std::collections::HashMap; +use futures::{stream::FuturesUnordered, StreamExt}; +use std::time::Duration; use tokio::sync::oneshot; struct BatchRequesterState { @@ -79,17 +79,15 @@ impl BatchRequesterState { } } -pub(crate) struct BatchRequester { +pub(crate) struct BatchRequester { epoch: u64, my_peer_id: PeerId, request_num_peers: usize, request_timeout_ms: usize, - digest_to_state: HashMap, - timeouts: DigestTimeouts, network_sender: T, } -impl BatchRequester { +impl BatchRequester { pub(crate) fn new( epoch: u64, my_peer_id: PeerId, @@ -102,71 +100,46 @@ impl BatchRequester { my_peer_id, request_num_peers, request_timeout_ms, - digest_to_state: HashMap::new(), - timeouts: DigestTimeouts::new(), network_sender, } } - async fn send_requests(&self, digest: HashValue, request_peers: Vec) { - // Quorum Store measurements - counters::SENT_BATCH_REQUEST_COUNT.inc(); - let request = BatchRequest::new(self.my_peer_id, self.epoch, digest); - self.network_sender - .send_batch_request(request, request_peers) - .await; - } - - pub(crate) async fn add_request( - &mut self, + pub(crate) fn request_batch( + &self, digest: HashValue, signers: Vec, - ret_tx: oneshot::Sender, aptos_executor_types::Error>>, + ret_tx: oneshot::Sender, Error>>, ) { let mut request_state = BatchRequesterState::new(signers, ret_tx); - let request_peers = request_state - .next_request_peers(self.request_num_peers) - .unwrap(); // note: this is the first try - - debug!("QS: requesting from {:?}", request_peers); + let network_sender = self.network_sender.clone(); + let request_num_peers = self.request_num_peers; + let my_peer_id = self.my_peer_id; + let epoch = self.epoch; + let timeout = Duration::from_millis(self.request_timeout_ms as u64); - self.digest_to_state.insert(digest, request_state); - self.send_requests(digest, request_peers).await; - self.timeouts.add_digest(digest, self.request_timeout_ms); - } - - pub(crate) async fn handle_timeouts(&mut self) { - for digest in self.timeouts.expire() { - debug!("QS: timed out batch request, digest = {}", digest); - if let Some(state) = self.digest_to_state.get_mut(&digest) { - if let Some(request_peers) = state.next_request_peers(self.request_num_peers) { - // Quorum Store measurements - counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc(); - self.send_requests(digest, request_peers).await; - self.timeouts.add_digest(digest, self.request_timeout_ms); - } else { - let state = self.digest_to_state.remove(&digest).unwrap(); - state.serve_request(digest, None); + tokio::spawn(async move { + while let Some(request_peers) = request_state.next_request_peers(request_num_peers) { + let mut futures = FuturesUnordered::new(); + debug!("QS: requesting from {:?}", request_peers); + let request = BatchRequest::new(my_peer_id, epoch, digest); + for peer in request_peers { + counters::SENT_BATCH_REQUEST_COUNT.inc(); + futures.push(network_sender.request_batch(request.clone(), peer, timeout)); } + while let Some(response) = futures.next().await { + if let Ok(batch) = response { + counters::RECEIVED_BATCH_COUNT.inc(); + if batch.verify().is_ok() { + let digest = batch.digest(); + let payload = batch.into_payload(); + request_state.serve_request(digest, Some(payload)); + return; + } + } + } + counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc(); } - } - } - - pub(crate) fn serve_request(&mut self, digest: HashValue, payload: Vec) { - if self.digest_to_state.contains_key(&digest) { - let mut hasher = DefaultHasher::new(b"QuorumStoreBatch"); - let serialized_payload: Vec = payload - .iter() - .flat_map(|txn| to_bytes(txn).unwrap()) - .collect(); - hasher.update(&serialized_payload); - if hasher.finish() == digest { - debug!("QS: serving batch digest = {}", digest); - let state = self.digest_to_state.remove(&digest).unwrap(); - state.serve_request(digest, Some(payload)); - } else { - debug!("Payload does not fit digest") - } - } + request_state.serve_request(digest, None); + }); } } diff --git a/consensus/src/quorum_store/batch_store.rs b/consensus/src/quorum_store/batch_store.rs index 27bb22898a13c..3c1e8424d38c8 100644 --- a/consensus/src/quorum_store/batch_store.rs +++ b/consensus/src/quorum_store/batch_store.rs @@ -1,38 +1,42 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::quorum_store_db::QuorumStoreStorage; use crate::{ network::QuorumStoreSender, quorum_store::{ - batch_reader::{BatchReader, BatchReaderCommand}, - counters, - proof_coordinator::ProofCoordinatorCommand, - types::{Batch, PersistedValue}, + batch_requester::BatchRequester, counters, quorum_store_db::QuorumStoreStorage, + types::PersistedValue, utils::RoundExpirations, }, }; -// use aptos_logger::spawn_named; +use anyhow::bail; use aptos_consensus_types::{ common::Round, - proof_of_store::{LogicalTime, SignedDigest}, + proof_of_store::{LogicalTime, ProofOfStore, SignedDigest}, }; use aptos_crypto::HashValue; +use aptos_executor_types::Error; use aptos_logger::debug; use aptos_types::{ transaction::SignedTransaction, validator_signer::ValidatorSigner, validator_verifier::ValidatorVerifier, PeerId, }; +use dashmap::{ + mapref::entry::Entry::{Occupied, Vacant}, + DashMap, +}; +use fail::fail_point; +use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; -use std::sync::Arc; -use tokio::sync::{ - mpsc::{Receiver, Sender}, - oneshot, +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, }; +use tokio::sync::oneshot; #[derive(Clone, Debug, Deserialize, Serialize)] pub struct PersistRequest { - digest: HashValue, - value: PersistedValue, + pub digest: HashValue, + pub value: PersistedValue, } impl PersistRequest { @@ -50,232 +54,413 @@ impl PersistRequest { } } -#[derive(Debug)] -pub(crate) enum BatchStoreCommand { - Persist(PersistRequest), - BatchRequest( - HashValue, - PeerId, - Option, aptos_executor_types::Error>>>, - ), - Clean(Vec), - Shutdown(oneshot::Sender<()>), +#[derive(PartialEq)] +enum StorageMode { + PersistedOnly, + MemoryAndPersisted, +} + +struct QuotaManager { + memory_balance: usize, + db_balance: usize, + memory_quota: usize, + db_quota: usize, +} + +impl QuotaManager { + fn new(max_db_balance: usize, max_memory_balance: usize) -> Self { + assert!(max_db_balance >= max_memory_balance); + Self { + memory_balance: 0, + db_balance: 0, + memory_quota: max_memory_balance, + db_quota: max_db_balance, + } + } + + pub(crate) fn update_quota(&mut self, num_bytes: usize) -> anyhow::Result { + if self.memory_balance + num_bytes <= self.memory_quota { + self.memory_balance += num_bytes; + self.db_balance += num_bytes; + Ok(StorageMode::MemoryAndPersisted) + } else if self.db_balance + num_bytes <= self.db_quota { + self.db_balance += num_bytes; + Ok(StorageMode::PersistedOnly) + } else { + counters::EXCEEDED_STORAGE_QUOTA_COUNT.inc(); + bail!("Storage quota exceeded "); + } + } + + pub(crate) fn free_quota(&mut self, num_bytes: usize, storage_mode: StorageMode) { + match storage_mode { + StorageMode::PersistedOnly => { + self.db_balance -= num_bytes; + }, + StorageMode::MemoryAndPersisted => { + self.memory_balance -= num_bytes; + self.db_balance -= num_bytes; + }, + } + } +} + +fn payload_storage_mode(persisted_value: &PersistedValue) -> StorageMode { + match persisted_value.maybe_payload { + Some(_) => StorageMode::MemoryAndPersisted, + None => StorageMode::PersistedOnly, + } } -pub(crate) struct BatchStore { - epoch: u64, - my_peer_id: PeerId, - network_sender: T, - batch_reader: Arc, +/// Provides in memory representation of stored batches (strong cache), and allows +/// efficient concurrent readers. +pub struct BatchStore { + epoch: OnceCell, + last_certified_round: AtomicU64, + db_cache: DashMap, + peer_quota: DashMap, + expirations: Mutex>, db: Arc, - validator_signer: Arc, + batch_expiry_round_gap_when_init: Round, + batch_expiry_round_gap_behind_latest_certified: Round, + batch_expiry_round_gap_beyond_latest_certified: Round, + expiry_grace_rounds: Round, + memory_quota: usize, + db_quota: usize, + batch_requester: BatchRequester, + validator_signer: ValidatorSigner, + validator_verifier: ValidatorVerifier, } -// TODO: send config to reduce number of arguments? -#[allow(clippy::too_many_arguments)] impl BatchStore { - pub fn new( + pub(crate) fn new( epoch: u64, - last_committed_round: Round, - my_peer_id: PeerId, - network_sender: T, - batch_store_tx: Sender, - batch_reader_tx: Sender, - batch_reader_rx: Receiver, + last_certified_round: Round, db: Arc, - validator_verifier: ValidatorVerifier, - validator_signer: Arc, batch_expiry_round_gap_when_init: Round, batch_expiry_round_gap_behind_latest_certified: Round, batch_expiry_round_gap_beyond_latest_certified: Round, - batch_expiry_grace_rounds: Round, - batch_request_num_peers: usize, - batch_request_timeout_ms: usize, + expiry_grace_rounds: Round, memory_quota: usize, db_quota: usize, - ) -> (Self, Arc) { - let db_content = db.get_all_batches().expect("failed to read data from db"); - debug!("QS: db size {}", db_content.len()); - - let (batch_reader, expired_keys) = BatchReader::new( - epoch, - last_committed_round, - db_content, - my_peer_id, - batch_store_tx, - batch_reader_tx, + batch_requester: BatchRequester, + validator_signer: ValidatorSigner, + validator_verifier: ValidatorVerifier, + ) -> Self { + let db_clone = db.clone(); + let batch_store = Self { + epoch: OnceCell::with_value(epoch), + last_certified_round: AtomicU64::new(last_certified_round), + db_cache: DashMap::new(), + peer_quota: DashMap::new(), + expirations: Mutex::new(RoundExpirations::new()), + db, batch_expiry_round_gap_when_init, batch_expiry_round_gap_behind_latest_certified, batch_expiry_round_gap_beyond_latest_certified, - batch_expiry_grace_rounds, + expiry_grace_rounds, memory_quota, db_quota, + batch_requester, + validator_signer, + validator_verifier, + }; + let db_content = db_clone + .get_all_batches() + .expect("failed to read data from db"); + let mut expired_keys = Vec::new(); + debug!( + "QS: Batchreader {} {} {}", + db_content.len(), + epoch, + last_certified_round ); - if let Err(e) = db.delete_batches(expired_keys) { - debug!("Error deleting batches: {:?}", e) + for (digest, value) in db_content { + let expiration = value.expiration; + + debug!( + "QS: Batchreader recovery content exp {:?}, digest {}", + expiration, digest + ); + assert!(epoch >= expiration.epoch()); + + if epoch > expiration.epoch() + || last_certified_round >= expiration.round() + expiry_grace_rounds + { + expired_keys.push(digest); + } else { + batch_store + .update_cache(digest, value) + .expect("Storage limit exceeded upon BatchReader construction"); + } } - let batch_reader_clone = batch_reader.clone(); - let net = network_sender.clone(); - let metrics_monitor = tokio_metrics::TaskMonitor::new(); - - tokio::spawn(async move { - metrics_monitor - .instrument(batch_reader_clone.start( - batch_reader_rx, - net, - batch_request_num_peers, - batch_request_timeout_ms, - validator_verifier, - )) - .await - }); - - let batch_reader_clone = batch_reader.clone(); - ( - Self { - epoch, - my_peer_id, - network_sender, - batch_reader, - db, - validator_signer, - }, - batch_reader_clone, - ) + debug!( + "QS: Batchreader recovery expired keys len {}", + expired_keys.len() + ); + db_clone.delete_batches(expired_keys).unwrap(); + + batch_store + } + + fn epoch(&self) -> u64 { + *self.epoch.get().unwrap() + } + + fn free_quota(&self, persisted_value: PersistedValue) { + let mut quota_manager = self + .peer_quota + .get_mut(&persisted_value.author) + .expect("No QuotaManager for batch author"); + quota_manager.free_quota( + persisted_value.num_bytes, + payload_storage_mode(&persisted_value), + ); + } + + // Return an error if storage quota is exceeded. + fn update_cache(&self, digest: HashValue, mut value: PersistedValue) -> anyhow::Result<()> { + let author = value.author; + if self + .peer_quota + .entry(author) + .or_insert(QuotaManager::new(self.db_quota, self.memory_quota)) + .update_quota(value.num_bytes)? + == StorageMode::PersistedOnly + { + value.remove_payload(); + } + + let expiration_round = value.expiration.round(); + if let Some(prev_value) = self.db_cache.insert(digest, value) { + self.free_quota(prev_value); + } + self.expirations + .lock() + .unwrap() + .add_item(digest, expiration_round); + Ok(()) } - fn store(&self, persist_request: PersistRequest) -> Option { - debug!("QS: store"); + pub(crate) fn save(&self, digest: HashValue, value: PersistedValue) -> anyhow::Result { + if value.expiration.epoch() == self.epoch() { + // record the round gaps + if value.expiration.round() > self.last_certified_round() { + counters::GAP_BETWEEN_BATCH_EXPIRATION_AND_LAST_CERTIFIED_ROUND_HIGHER + .observe((value.expiration.round() - self.last_certified_round()) as f64); + } + if value.expiration.round() < self.last_certified_round() { + counters::GAP_BETWEEN_BATCH_EXPIRATION_AND_LAST_CERTIFIED_ROUND_LOWER + .observe((self.last_certified_round() - value.expiration.round()) as f64); + } + + if value.expiration.round() + self.batch_expiry_round_gap_behind_latest_certified + >= self.last_certified_round() + && value.expiration.round() + <= self.last_certified_round() + + self.batch_expiry_round_gap_beyond_latest_certified + { + fail_point!("quorum_store::save", |_| { + // Skip caching and storing value to the db + Ok(false) + }); + + if let Some(entry) = self.db_cache.get(&digest) { + if entry.expiration.round() >= value.expiration.round() { + debug!("QS: already have the digest with higher expiration"); + return Ok(false); + } + } + self.update_cache(digest, value)?; + return Ok(true); + } + } + bail!("Incorrect expiration {:?} with init gap {} in epoch {}, last committed round {} and max behind gap {} max beyond gap {}", + value.expiration, + self.batch_expiry_round_gap_when_init, + self.epoch(), + self.last_certified_round(), + self.batch_expiry_round_gap_behind_latest_certified, + self.batch_expiry_round_gap_beyond_latest_certified); + } + + fn clear_expired_payload(&self, certified_time: LogicalTime) -> Vec { + assert_eq!( + certified_time.epoch(), + self.epoch(), + "Execution epoch inconsistent with BatchReader" + ); + + let expired_round = if certified_time.round() >= self.expiry_grace_rounds { + certified_time.round() - self.expiry_grace_rounds + } else { + 0 + }; + + let expired_digests = self.expirations.lock().unwrap().expire(expired_round); + let mut ret = Vec::new(); + for h in expired_digests { + let removed_value = match self.db_cache.entry(h) { + Occupied(entry) => { + // We need to check up-to-date expiration again because receiving the same + // digest with a higher expiration would update the persisted value and + // effectively extend the expiration. + if entry.get().expiration.round() <= expired_round { + Some(entry.remove()) + } else { + None + } + }, + Vacant(_) => unreachable!("Expired entry not in cache"), + }; + // No longer holding the lock on db_cache entry. + if let Some(value) = removed_value { + self.free_quota(value); + ret.push(h); + } + } + ret + } + + pub fn persist(&self, persist_request: PersistRequest) -> Option { let expiration = persist_request.value.expiration; // Network listener should filter messages with wrong expiration epoch. assert_eq!( expiration.epoch(), - self.epoch, + self.epoch(), "Persist Request for a batch with an incorrect epoch" ); - match self - .batch_reader - .save(persist_request.digest, persist_request.value.clone()) // TODO: what is this comes from old epoch? - { + + match self.save(persist_request.digest, persist_request.value.clone()) { Ok(needs_db) => { - let batch_author = persist_request.value.author; let num_txns = persist_request.value.maybe_payload.as_ref().unwrap().len() as u64; let num_bytes = persist_request.value.num_bytes as u64; + let batch_author = persist_request.value.author; debug!("QS: sign digest"); if needs_db { - // TODO: Consider an async call to DB, but it could be a race with clean. self.db .save_batch(persist_request.digest, persist_request.value) .expect("Could not write to DB"); } - Some(SignedDigest::new( - batch_author, - self.epoch, - persist_request.digest, - expiration, - num_txns, - num_bytes, - self.validator_signer.clone(), - ).unwrap()) - } + Some( + SignedDigest::new( + batch_author, + self.epoch(), + persist_request.digest, + expiration, + num_txns, + num_bytes, + &self.validator_signer, + ) + .unwrap(), + ) + }, Err(e) => { debug!("QS: failed to store to cache {:?}", e); None - } + }, } } - pub async fn start( - self, - mut batch_store_rx: Receiver, - proof_coordinator_tx: Sender, - ) { - debug!( - "[QS worker] BatchStore worker for epoch {} starting", - self.epoch + // TODO: make sure state-sync also sends the message, or execution cleans. + // When self.expiry_grace_rounds == 0, certified time contains a round for + // which execution result has been certified by a quorum, and as such, the + // batches with expiration in this round can be cleaned up. The parameter + // expiry grace rounds just keeps the batches around for a little longer + // for lagging nodes to be able to catch up (without state-sync). + pub async fn update_certified_round(&self, certified_time: LogicalTime) { + debug!("QS: batch reader updating time {:?}", certified_time); + assert_eq!( + self.epoch(), + certified_time.epoch(), + "QS: wrong epoch {} != {}", + self.epoch(), + certified_time.epoch() ); - while let Some(command) = batch_store_rx.recv().await { - match command { - BatchStoreCommand::Shutdown(ack_tx) => { - self.batch_reader.shutdown().await; - ack_tx - .send(()) - .expect("Failed to send shutdown ack to QuorumStore"); - break; - }, - BatchStoreCommand::Persist(persist_request) => { - let author = persist_request.value.author; - if let Some(signed_digest) = self.store(persist_request) { - if self.my_peer_id == author { - proof_coordinator_tx - .send(ProofCoordinatorCommand::AppendSignature(signed_digest)) - .await - .expect("Failed to send to ProofBuilder"); - debug!("QS: sent signed digest to ProofBuilder"); - } else { - // Quorum store metrics - counters::DELIVERED_BATCHES_COUNT.inc(); - - self.network_sender - .send_signed_digest(signed_digest, vec![author]) - .await; - debug!("QS: sent signed digest back to sender"); - } - } - }, - BatchStoreCommand::Clean(digests) => { - if let Err(e) = self.db.delete_batches(digests) { - debug!("Error deleting batches: {:?}", e) - } - }, - BatchStoreCommand::BatchRequest(digest, peer_id, maybe_tx) => { - counters::GET_BATCH_FROM_DB_COUNT.inc(); - - match self.db.get_batch(digest) { - Ok(Some(persisted_value)) => { - let payload = persisted_value - .maybe_payload - .expect("Persisted value in QuorumStore DB must have payload"); - match maybe_tx { - Some(payload_tx) => { - assert_eq!( - self.my_peer_id, peer_id, - "Return channel must be to self" - ); - if payload_tx.send(Ok(payload)).is_err() { - debug!( - "Failed to send PersistedValue for digest {}", - digest - ); - } - }, - None => { - assert_ne!( - self.my_peer_id, peer_id, - "Request from self without return channel" - ); - let batch = - Batch::new(self.my_peer_id, self.epoch, digest, payload); - self.network_sender.send_batch(batch, vec![peer_id]).await; - }, - } - }, - Ok(None) => unreachable!( - "Could not read persisted value (according to BatchReader) from DB" - ), - Err(_) => { - // TODO: handle error, e.g. from self or not, log, panic. - }, - } - }, + let prev_round = self + .last_certified_round + .fetch_max(certified_time.round(), Ordering::SeqCst); + // Note: prev_round may be equal to certified_time round due to state-sync + // at the epoch boundary. + assert!( + prev_round <= certified_time.round(), + "Decreasing executed rounds reported to BatchReader {} {}", + prev_round, + certified_time.round(), + ); + + let expired_keys = self.clear_expired_payload(certified_time); + if let Err(e) = self.db.delete_batches(expired_keys) { + debug!("Error deleting batches: {:?}", e) + } + } + + fn last_certified_round(&self) -> Round { + self.last_certified_round.load(Ordering::Relaxed) + } + + fn get_batch_from_db(&self, digest: &HashValue) -> Result, Error> { + counters::GET_BATCH_FROM_DB_COUNT.inc(); + + match self.db.get_batch(digest) { + Ok(Some(persisted_value)) => { + let payload = persisted_value + .maybe_payload + .expect("Persisted value in QuorumStore DB must have payload"); + return Ok(payload); + }, + Ok(None) => { + unreachable!("Could not read persisted value (according to BatchReader) from DB") + }, + Err(_) => { + // TODO: handle error, e.g. from self or not, log, panic. + }, + } + Err(Error::CouldNotGetData) + } + + pub fn get_batch_from_local( + &self, + digest: &HashValue, + ) -> Result, Error> { + if let Some(value) = self.db_cache.get(digest) { + if payload_storage_mode(&value) == StorageMode::PersistedOnly { + assert!( + value.maybe_payload.is_none(), + "BatchReader payload and storage kind mismatch" + ); + self.get_batch_from_db(digest) + } else { + // Available in memory. + Ok(value + .maybe_payload + .clone() + .expect("BatchReader payload and storage kind mismatch")) } + } else { + Err(Error::CouldNotGetData) } + } - debug!( - "[QS worker] BatchStore worker for epoch {} stopping", - self.epoch - ); + pub fn get_batch( + &self, + proof: ProofOfStore, + ) -> oneshot::Receiver, Error>> { + let (tx, rx) = oneshot::channel(); + + if let Ok(value) = self.get_batch_from_local(proof.digest()) { + tx.send(Ok(value)).unwrap(); + } else { + // Quorum store metrics + counters::MISSED_BATCHES_COUNT.inc(); + self.batch_requester.request_batch( + *proof.digest(), + proof.shuffled_signers(&self.validator_verifier), + tx, + ); + } + rx } } diff --git a/consensus/src/quorum_store/counters.rs b/consensus/src/quorum_store/counters.rs index dde5e57d51757..494fcd24267e9 100644 --- a/consensus/src/quorum_store/counters.rs +++ b/consensus/src/quorum_store/counters.rs @@ -3,8 +3,8 @@ use aptos_metrics_core::{ exponential_buckets, op_counters::DurationHistogram, register_histogram, - register_histogram_vec, register_int_counter, AverageIntCounter, Histogram, HistogramVec, - IntCounter, + register_histogram_vec, register_int_counter, register_int_counter_vec, AverageIntCounter, + Histogram, HistogramVec, IntCounter, IntCounterVec, }; use once_cell::sync::Lazy; use std::time::Duration; @@ -373,6 +373,16 @@ pub static SENT_BATCH_REQUEST_RETRY_COUNT: Lazy = Lazy::new(|| { .unwrap() }); +/// Counters(queued,dequeued,dropped) related to batch retrieval per epoch task +pub static BATCH_RETRIEVAL_TASK_MSGS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "aptos_quorum_store_batch_retrieval_task_msgs_count", + "Counters(queued,dequeued,dropped) related to batch retrieval task", + &["state"] + ) + .unwrap() +}); + /// Count of the number of batch request received from other nodes. pub static RECEIVED_BATCH_REQUEST_COUNT: Lazy = Lazy::new(|| { register_int_counter!( diff --git a/consensus/src/quorum_store/mod.rs b/consensus/src/quorum_store/mod.rs index 7ee634ee4d567..cee56195cc46c 100644 --- a/consensus/src/quorum_store/mod.rs +++ b/consensus/src/quorum_store/mod.rs @@ -8,7 +8,6 @@ pub mod direct_mempool_quorum_store; pub(crate) mod batch_aggregator; pub(crate) mod batch_coordinator; pub(crate) mod batch_generator; -pub(crate) mod batch_reader; pub(crate) mod batch_requester; pub(crate) mod batch_store; pub(crate) mod network_listener; diff --git a/consensus/src/quorum_store/network_listener.rs b/consensus/src/quorum_store/network_listener.rs index 9144db43d9252..8f9b7a385cb44 100644 --- a/consensus/src/quorum_store/network_listener.rs +++ b/consensus/src/quorum_store/network_listener.rs @@ -3,7 +3,7 @@ use crate::{ quorum_store::{ - batch_coordinator::BatchCoordinatorCommand, batch_reader::BatchReaderCommand, counters, + batch_coordinator::BatchCoordinatorCommand, counters, proof_coordinator::ProofCoordinatorCommand, proof_manager::ProofManagerCommand, }, round_manager::VerifiedEvent, @@ -16,7 +16,6 @@ use tokio::sync::mpsc::Sender; pub(crate) struct NetworkListener { network_msg_rx: aptos_channel::Receiver, - batch_reader_tx: Sender, proof_coordinator_tx: Sender, remote_batch_coordinator_tx: Vec>, proof_manager_tx: Sender, @@ -25,14 +24,12 @@ pub(crate) struct NetworkListener { impl NetworkListener { pub(crate) fn new( network_msg_rx: aptos_channel::Receiver, - batch_reader_tx: Sender, proof_coordinator_tx: Sender, remote_batch_coordinator_tx: Vec>, proof_manager_tx: Sender, ) -> Self { Self { network_msg_rx, - batch_reader_tx, proof_coordinator_tx, remote_batch_coordinator_tx, proof_manager_tx, @@ -78,34 +75,6 @@ impl NetworkListener { .await .expect("Could not send remote fragment"); }, - VerifiedEvent::BatchRequestMsg(request) => { - counters::RECEIVED_BATCH_REQUEST_COUNT.inc(); - debug!( - "QS: batch request from {:?} digest {}", - request.source(), - request.digest() - ); - let cmd = - BatchReaderCommand::GetBatchForPeer(request.digest(), request.source()); - self.batch_reader_tx - .send(cmd) - .await - .expect("could not push Batch batch_reader"); - }, - VerifiedEvent::UnverifiedBatchMsg(batch) => { - counters::RECEIVED_BATCH_COUNT.inc(); - debug!( - "QS: batch response from {:?} digest {}", - batch.source(), - batch.digest() - ); - let cmd = - BatchReaderCommand::BatchResponse(batch.digest(), batch.into_payload()); - self.batch_reader_tx - .send(cmd) - .await - .expect("could not push Batch batch_reader"); - }, VerifiedEvent::ProofOfStoreMsg(proof) => { counters::REMOTE_POS_COUNT.inc(); let cmd = ProofManagerCommand::RemoteProof(*proof); diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index d77b0de94d87e..3560d34bdb4bb 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -3,18 +3,22 @@ use super::quorum_store_db::QuorumStoreStorage; use crate::{ - network::NetworkSender, + error::error_kind, + network::{IncomingBatchRetrievalRequest, NetworkSender}, + network_interface::ConsensusMsg, payload_manager::PayloadManager, quorum_store::{ batch_coordinator::{BatchCoordinator, BatchCoordinatorCommand}, batch_generator::{BatchGenerator, BatchGeneratorCommand}, - batch_reader::{BatchReader, BatchReaderCommand}, - batch_store::{BatchStore, BatchStoreCommand}, + batch_requester::BatchRequester, + batch_store::BatchStore, + counters, direct_mempool_quorum_store::DirectMempoolQuorumStore, network_listener::NetworkListener, proof_coordinator::{ProofCoordinator, ProofCoordinatorCommand}, proof_manager::{ProofManager, ProofManagerCommand}, quorum_store_coordinator::{CoordinatorCommand, QuorumStoreCoordinator}, + types::Batch, }, round_manager::VerifiedEvent, }; @@ -31,6 +35,7 @@ use aptos_types::{ account_address::AccountAddress, validator_signer::ValidatorSigner, validator_verifier::ValidatorVerifier, }; +use futures::StreamExt; use futures_channel::mpsc::{Receiver, Sender}; use std::{sync::Arc, time::Duration}; @@ -52,7 +57,12 @@ impl QuorumStoreBuilder { } } - pub fn start(self) -> Option> { + pub fn start( + self, + ) -> Option<( + Sender, + aptos_channel::Sender, + )> { match self { QuorumStoreBuilder::DirectMempool(inner) => { inner.start(); @@ -123,10 +133,6 @@ pub struct InnerBuilder { proof_coordinator_cmd_rx: Option>, proof_manager_cmd_tx: tokio::sync::mpsc::Sender, proof_manager_cmd_rx: Option>, - batch_store_cmd_tx: tokio::sync::mpsc::Sender, - batch_store_cmd_rx: Option>, - batch_reader_cmd_tx: tokio::sync::mpsc::Sender, - batch_reader_cmd_rx: Option>, back_pressure_tx: tokio::sync::mpsc::Sender, back_pressure_rx: Option>, quorum_store_storage: Arc, @@ -134,6 +140,7 @@ pub struct InnerBuilder { quorum_store_msg_rx: Option>, remote_batch_coordinator_cmd_tx: Vec>, remote_batch_coordinator_cmd_rx: Vec>, + batch_store: Option>>, } impl InnerBuilder { @@ -159,10 +166,6 @@ impl InnerBuilder { tokio::sync::mpsc::channel(config.channel_size); let (proof_manager_cmd_tx, proof_manager_cmd_rx) = tokio::sync::mpsc::channel(config.channel_size); - let (batch_store_cmd_tx, batch_store_cmd_rx) = - tokio::sync::mpsc::channel(config.channel_size); - let (batch_reader_cmd_tx, batch_reader_cmd_rx) = - tokio::sync::mpsc::channel(config.channel_size); let (back_pressure_tx, back_pressure_rx) = tokio::sync::mpsc::channel(config.channel_size); let (quorum_store_msg_tx, quorum_store_msg_rx) = aptos_channel::new::( @@ -200,10 +203,6 @@ impl InnerBuilder { proof_coordinator_cmd_rx: Some(proof_coordinator_cmd_rx), proof_manager_cmd_tx, proof_manager_cmd_rx: Some(proof_manager_cmd_rx), - batch_store_cmd_tx, - batch_store_cmd_rx: Some(batch_store_cmd_rx), - batch_reader_cmd_tx, - batch_reader_cmd_rx: Some(batch_reader_cmd_rx), back_pressure_tx, back_pressure_rx: Some(back_pressure_rx), quorum_store_storage, @@ -211,10 +210,11 @@ impl InnerBuilder { quorum_store_msg_rx: Some(quorum_store_msg_rx), remote_batch_coordinator_cmd_tx, remote_batch_coordinator_cmd_rx, + batch_store: None, } } - fn spawn_quorum_store(&mut self) -> Arc { + fn create_batch_store(&mut self) -> Arc> { let backend = &self.backend; let storage: Storage = backend.try_into().expect("Unable to initialize storage"); if let Err(error) = storage.available() { @@ -244,37 +244,38 @@ impl InnerBuilder { 0 }; - let batch_store_cmd_rx = self.batch_store_cmd_rx.take().unwrap(); - let batch_reader_cmd_rx = self.batch_reader_cmd_rx.take().unwrap(); - let (batch_store, batch_reader) = BatchStore::new( + let batch_requester = BatchRequester::new( self.epoch, - last_committed_round, self.author, + self.config.batch_request_num_peers, + self.config.batch_request_timeout_ms, self.network_sender.clone(), - self.batch_store_cmd_tx.clone(), - self.batch_reader_cmd_tx.clone(), - batch_reader_cmd_rx, + ); + let batch_store = Arc::new(BatchStore::new( + self.epoch, + last_committed_round, self.quorum_store_storage.clone(), - self.verifier.clone(), - Arc::new(signer), self.config.batch_expiry_round_gap_when_init, self.config.batch_expiry_round_gap_behind_latest_certified, self.config.batch_expiry_round_gap_beyond_latest_certified, self.config.batch_expiry_grace_rounds, - self.config.batch_request_num_peers, - self.config.batch_request_timeout_ms, self.config.memory_quota, self.config.db_quota, - ); - spawn_named!( - "batch_store", - batch_store.start(batch_store_cmd_rx, self.proof_coordinator_cmd_tx.clone()) - ); + batch_requester, + signer, + self.verifier.clone(), + )); + self.batch_store = Some(batch_store.clone()); - batch_reader + batch_store } - fn spawn_quorum_store_wrapper(mut self) -> Sender { + fn spawn_quorum_store( + mut self, + ) -> ( + Sender, + aptos_channel::Sender, + ) { // TODO: parameter? bring back back-off? let interval = tokio::time::interval(Duration::from_millis( self.config.mempool_pulling_interval as u64, @@ -288,7 +289,6 @@ impl InnerBuilder { self.remote_batch_coordinator_cmd_tx.clone(), self.proof_coordinator_cmd_tx.clone(), self.proof_manager_cmd_tx.clone(), - self.batch_store_cmd_tx.clone(), self.quorum_store_msg_tx.clone(), ); spawn_named!( @@ -317,7 +317,7 @@ impl InnerBuilder { self.author, self.network_sender.clone(), batch_coordinator_cmd_rx, - self.batch_store_cmd_tx.clone(), + self.batch_store.clone().unwrap(), self.proof_coordinator_cmd_tx.clone(), self.config.max_batch_bytes, ); @@ -331,7 +331,7 @@ impl InnerBuilder { self.author, self.network_sender.clone(), remote_batch_coordinator_cmd_rx, - self.batch_store_cmd_tx.clone(), + self.batch_store.clone().unwrap(), self.proof_coordinator_cmd_tx.clone(), self.config.max_batch_bytes, ); @@ -369,14 +369,42 @@ impl InnerBuilder { let network_msg_rx = self.quorum_store_msg_rx.take().unwrap(); let net = NetworkListener::new( network_msg_rx, - self.batch_reader_cmd_tx.clone(), self.proof_coordinator_cmd_tx.clone(), self.remote_batch_coordinator_cmd_tx.clone(), self.proof_manager_cmd_tx.clone(), ); spawn_named!("network_listener", net.start()); - self.coordinator_tx + let batch_store = self.batch_store.clone().unwrap(); + let author = self.author; + let epoch = self.epoch; + let (batch_retrieval_tx, mut batch_retrieval_rx) = + aptos_channel::new::( + QueueStyle::LIFO, + 10, + Some(&counters::BATCH_RETRIEVAL_TASK_MSGS), + ); + spawn_named!("batch_serve", async move { + info!(epoch = epoch, "Batch retrieval task starts"); + while let Some(rpc_request) = batch_retrieval_rx.next().await { + counters::RECEIVED_BATCH_REQUEST_COUNT.inc(); + if let Ok(value) = batch_store.get_batch_from_local(&rpc_request.req.digest()) { + let batch = Batch::new(author, epoch, rpc_request.req.digest(), value); + let msg = ConsensusMsg::BatchMsg(Box::new(batch)); + let bytes = rpc_request.protocol.to_bytes(&msg).unwrap(); + if let Err(e) = rpc_request + .response_sender + .send(Ok(bytes.into())) + .map_err(|_| anyhow::anyhow!("Failed to send block retrieval response")) + { + warn!(epoch = epoch, error = ?e, kind = error_kind(&e)); + } + } + } + info!(epoch = epoch, "Batch retrieval task stops"); + }); + + (self.coordinator_tx, batch_retrieval_tx) } fn init_payload_manager( @@ -385,11 +413,11 @@ impl InnerBuilder { Arc, Option>, ) { - let batch_reader = self.spawn_quorum_store(); + let batch_store = self.create_batch_store(); ( Arc::from(PayloadManager::InQuorumStore( - batch_reader, + batch_store, // TODO: remove after splitting out clean requests Mutex::new(self.coordinator_tx.clone()), )), @@ -397,7 +425,12 @@ impl InnerBuilder { ) } - fn start(self) -> Sender { - self.spawn_quorum_store_wrapper() + fn start( + self, + ) -> ( + Sender, + aptos_channel::Sender, + ) { + self.spawn_quorum_store() } } diff --git a/consensus/src/quorum_store/quorum_store_coordinator.rs b/consensus/src/quorum_store/quorum_store_coordinator.rs index ac64a6bca8da3..d829e8b9b6f3f 100644 --- a/consensus/src/quorum_store/quorum_store_coordinator.rs +++ b/consensus/src/quorum_store/quorum_store_coordinator.rs @@ -4,8 +4,7 @@ use crate::{ quorum_store::{ batch_coordinator::BatchCoordinatorCommand, batch_generator::BatchGeneratorCommand, - batch_store::BatchStoreCommand, proof_coordinator::ProofCoordinatorCommand, - proof_manager::ProofManagerCommand, + proof_coordinator::ProofCoordinatorCommand, proof_manager::ProofManagerCommand, }, round_manager::VerifiedEvent, }; @@ -29,7 +28,6 @@ pub struct QuorumStoreCoordinator { remote_batch_coordinator_cmd_tx: Vec>, proof_coordinator_cmd_tx: mpsc::Sender, proof_manager_cmd_tx: mpsc::Sender, - batch_store_cmd_tx: mpsc::Sender, quorum_store_msg_tx: aptos_channel::Sender, } @@ -41,7 +39,6 @@ impl QuorumStoreCoordinator { remote_batch_coordinator_cmd_tx: Vec>, proof_coordinator_cmd_tx: mpsc::Sender, proof_manager_cmd_tx: mpsc::Sender, - batch_store_cmd_tx: mpsc::Sender, quorum_store_msg_tx: aptos_channel::Sender, ) -> Self { Self { @@ -51,7 +48,6 @@ impl QuorumStoreCoordinator { remote_batch_coordinator_cmd_tx, proof_coordinator_cmd_tx, proof_manager_cmd_tx, - batch_store_cmd_tx, quorum_store_msg_tx, } } @@ -133,15 +129,6 @@ impl QuorumStoreCoordinator { .expect("Failed to stop Remote BatchCoordinator"); } - let (batch_store_shutdown_tx, batch_store_shutdown_rx) = oneshot::channel(); - self.batch_store_cmd_tx - .send(BatchStoreCommand::Shutdown(batch_store_shutdown_tx)) - .await - .expect("Failed to send to BatchStore"); - batch_store_shutdown_rx - .await - .expect("Failed to stop BatchStore"); - let (proof_coordinator_shutdown_tx, proof_coordinator_shutdown_rx) = oneshot::channel(); self.proof_coordinator_cmd_tx diff --git a/consensus/src/quorum_store/quorum_store_db.rs b/consensus/src/quorum_store/quorum_store_db.rs index f4adb8d8336bf..6f1f24803743d 100644 --- a/consensus/src/quorum_store/quorum_store_db.rs +++ b/consensus/src/quorum_store/quorum_store_db.rs @@ -21,7 +21,7 @@ pub(crate) trait QuorumStoreStorage: Sync + Send { fn save_batch(&self, digest: HashValue, batch: PersistedValue) -> Result<(), DbError>; - fn get_batch(&self, digest: HashValue) -> Result, DbError>; + fn get_batch(&self, digest: &HashValue) -> Result, DbError>; fn delete_batch_id(&self, epoch: u64) -> Result<(), DbError>; @@ -84,8 +84,8 @@ impl QuorumStoreStorage for QuorumStoreDB { Ok(self.db.put::(&digest, &batch)?) } - fn get_batch(&self, digest: HashValue) -> Result, DbError> { - Ok(self.db.get::(&digest)?) + fn get_batch(&self, digest: &HashValue) -> Result, DbError> { + Ok(self.db.get::(digest)?) } fn delete_batch_id(&self, epoch: u64) -> Result<(), DbError> { @@ -139,7 +139,7 @@ impl QuorumStoreStorage for MockQuorumStoreDB { Ok(()) } - fn get_batch(&self, _: HashValue) -> Result, DbError> { + fn get_batch(&self, _: &HashValue) -> Result, DbError> { Ok(None) } diff --git a/consensus/src/quorum_store/tests/batch_reader_test.rs b/consensus/src/quorum_store/tests/batch_reader_test.rs deleted file mode 100644 index caa00331ef193..0000000000000 --- a/consensus/src/quorum_store/tests/batch_reader_test.rs +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::quorum_store::{batch_reader::BatchReader, types::PersistedValue}; -use aptos_consensus_types::proof_of_store::LogicalTime; -use aptos_crypto::HashValue; -use aptos_types::account_address::AccountAddress; -use futures::executor::block_on; -use std::{ - collections::HashMap, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, -}; -use tokio::{sync::mpsc::channel, task::spawn_blocking}; - -#[tokio::test(flavor = "multi_thread")] -async fn test_extend_expiration_vs_save() { - let num_experiments = 2000; - - let (tx1, _rx1) = channel(num_experiments + 50); - let (self_tx, _rx2) = channel(1); // Shouldn't send anything to self in this test. - let (batch_reader, _) = BatchReader::new( - 10, // epoch - 10, // last committed round - HashMap::new(), // db recovery state - AccountAddress::random(), // self peer id - tx1, // batch store sender - self_tx, // self sender - 0, - 0, - 2100, - 0, // grace period rounds - 0, // memory_quota - 1000, // db quota - ); - - let batch_reader_clone1 = batch_reader.clone(); - let batch_reader_clone2 = batch_reader.clone(); - - let digests: Vec = (0..num_experiments).map(|_| HashValue::random()).collect(); - let later_exp_values: Vec<(HashValue, PersistedValue)> = (0..num_experiments) - .map(|i| { - // Pre-insert some of them. - if i % 2 == 0 { - batch_reader - .save( - digests[i], - PersistedValue::new( - Some(Vec::new()), - LogicalTime::new(10, i as u64 + 30), - AccountAddress::random(), - 10, - ), - ) - .unwrap(); - } - - ( - digests[i], - PersistedValue::new( - Some(Vec::new()), - LogicalTime::new(10, i as u64 + 40), - AccountAddress::random(), - 10, - ), - ) - }) - .collect(); - - // Marshal threads to start at the same time. - let start_flag = Arc::new(AtomicUsize::new(0)); - let start_clone1 = start_flag.clone(); - let start_clone2 = start_flag.clone(); - - // Thread that extends expiration by saving. - spawn_blocking(move || { - for (i, (digest, later_exp_value)) in later_exp_values.into_iter().enumerate() { - // Wait until both threads are ready for next experiment. - loop { - let flag_val = start_clone1.load(Ordering::Acquire); - if flag_val == 3 * i + 1 || flag_val == 3 * i + 2 { - break; - } - } - - batch_reader_clone1.save(digest, later_exp_value).unwrap(); - start_clone1.fetch_add(1, Ordering::Relaxed); - } - }); - - // Thread that expires. - spawn_blocking(move || { - for i in 0..num_experiments { - // Wait until both threads are ready for next experiment. - loop { - let flag_val = start_clone2.load(Ordering::Acquire); - if flag_val == 3 * i + 1 || flag_val == 3 * i + 2 { - break; - } - } - - block_on( - batch_reader_clone2.update_certified_round(LogicalTime::new(10, i as u64 + 30)), - ); - start_clone2.fetch_add(1, Ordering::Relaxed); - } - }); - - for (i, &digest) in digests.iter().enumerate().take(num_experiments) { - // Set the conditions for experiment (both threads waiting). - while start_flag.load(Ordering::Acquire) % 3 != 0 {} - - if i % 2 == 1 { - batch_reader - .save( - digest, - PersistedValue::new( - Some(Vec::new()), - LogicalTime::new(10, i as u64 + 30), - AccountAddress::random(), - 10, - ), - ) - .unwrap(); - } - - // Unleash the threads. - start_flag.fetch_add(1, Ordering::Relaxed); - } - // Finish the experiment - while start_flag.load(Ordering::Acquire) % 3 != 0 {} - - // Expire everything, call for higher times as well. - for i in 35..50 { - batch_reader - .update_certified_round(LogicalTime::new(10, (i + num_experiments) as u64)) - .await; - } -} - -// TODO: last certified round. -// TODO: check correct digests are returned. -// TODO: check grace period. -// TODO: check quota. -// TODO: check the channels. diff --git a/consensus/src/quorum_store/tests/batch_requester_test.rs b/consensus/src/quorum_store/tests/batch_requester_test.rs deleted file mode 100644 index d287ea838137d..0000000000000 --- a/consensus/src/quorum_store/tests/batch_requester_test.rs +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::{ - network_interface::ConsensusMsg, - quorum_store::{ - batch_requester::BatchRequester, - tests::utils::{compute_digest_from_signed_transaction, create_vec_signed_transactions}, - types::BatchRequest, - }, - test_utils::mock_quorum_store_sender::MockQuorumStoreSender, -}; -use aptos_types::account_address::AccountAddress; -use claims::{assert_err, assert_some}; -use tokio::sync::{mpsc::channel, oneshot}; - -#[tokio::test(flavor = "multi_thread")] -async fn test_batch_requester() { - let (tx, mut rx) = channel(100); - let sender = MockQuorumStoreSender::new(tx); - let epoch = 1; - let id = AccountAddress::random(); - let request_num_peers = 3; - let request_timeout_ms = 0; - let mut batch_requester = - BatchRequester::new(epoch, id, request_num_peers, request_timeout_ms, sender); - - let signed_transactions = create_vec_signed_transactions(100); - let digest = compute_digest_from_signed_transaction(signed_transactions.clone()); - let (oneshot_tx, oneshot_rx) = oneshot::channel(); - let mut signers = Vec::new(); - - for _ in 1..10 { - signers.push(AccountAddress::random()); - } - - batch_requester - .add_request(digest, signers, oneshot_tx) - .await; - let res = rx.recv().await; - assert_some!(res.clone()); - let (msg, signers) = res.unwrap(); - match msg { - ConsensusMsg::BatchRequestMsg(request) => { - assert_eq!(*request, BatchRequest::new(id, epoch, digest)) - }, - _ => unreachable!(), - } - assert_eq!(signers.len(), 3); - - batch_requester.serve_request(digest, signed_transactions.clone()); - assert_eq!( - oneshot_rx.await.expect("sender dropped"), - Ok(signed_transactions) - ); - - // test timeout logic - let signed_transactions = create_vec_signed_transactions(200); - let digest = compute_digest_from_signed_transaction(signed_transactions.clone()); - let (oneshot_tx, oneshot_rx) = oneshot::channel(); - batch_requester - .add_request(digest, signers, oneshot_tx) - .await; - batch_requester.handle_timeouts().await; - assert_some!(rx.recv().await); - batch_requester.handle_timeouts().await; - assert_some!(rx.recv().await); - batch_requester.handle_timeouts().await; - assert_some!(rx.recv().await); - batch_requester.handle_timeouts().await; - assert_some!(rx.recv().await); - batch_requester.handle_timeouts().await; - assert_err!(oneshot_rx.await.unwrap()); -} diff --git a/consensus/src/quorum_store/tests/batch_store_test.rs b/consensus/src/quorum_store/tests/batch_store_test.rs index 2c2b3b6e1674a..3b7d839ee95b3 100644 --- a/consensus/src/quorum_store/tests/batch_store_test.rs +++ b/consensus/src/quorum_store/tests/batch_store_test.rs @@ -2,189 +2,159 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - network_interface::ConsensusMsg, quorum_store::{ - batch_reader::{BatchReader, BatchReaderCommand}, - batch_store::{BatchStore, BatchStoreCommand, PersistRequest}, - proof_coordinator::ProofCoordinatorCommand, - quorum_store_db::QuorumStoreDB, - tests::utils::{compute_digest_from_signed_transaction, create_vec_signed_transactions}, - types::SerializedTransaction, + batch_requester::BatchRequester, batch_store::BatchStore, quorum_store_db::QuorumStoreDB, + types::PersistedValue, }, test_utils::mock_quorum_store_sender::MockQuorumStoreSender, }; -use aptos_config::config::QuorumStoreConfig; -use aptos_consensus_types::{common::Author, proof_of_store::LogicalTime}; +use aptos_consensus_types::proof_of_store::LogicalTime; use aptos_crypto::HashValue; -use aptos_logger::spawn_named; use aptos_temppath::TempPath; -use aptos_types::{ - transaction::SignedTransaction, - validator_signer::ValidatorSigner, - validator_verifier::{random_validator_verifier, ValidatorVerifier}, - PeerId, +use aptos_types::{account_address::AccountAddress, validator_verifier::random_validator_verifier}; +use futures::executor::block_on; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, }; -use std::sync::Arc; - -struct TestBatchStore { - pub batch_store: BatchStore, - pub _batch_reader: Arc, - pub network_rx: tokio::sync::mpsc::Receiver<(ConsensusMsg, Vec)>, - pub batch_store_cmd_tx: tokio::sync::mpsc::Sender, - pub batch_store_cmd_rx: tokio::sync::mpsc::Receiver, - pub batch_reader_cmd_tx: tokio::sync::mpsc::Sender, -} +use tokio::{sync::mpsc::channel, task::spawn_blocking}; -fn start_batch_store( - signer: ValidatorSigner, - db_path: &TempPath, - validator_verifier: &ValidatorVerifier, -) -> TestBatchStore { - let config = QuorumStoreConfig::default(); - let (network_tx, network_rx) = tokio::sync::mpsc::channel(100); - let network_sender = MockQuorumStoreSender::new(network_tx); - - let (batch_store_cmd_tx, batch_store_cmd_rx) = tokio::sync::mpsc::channel(100); - let (batch_reader_cmd_tx, batch_reader_cmd_rx) = tokio::sync::mpsc::channel(100); - let (batch_store, _batch_reader) = BatchStore::new( +#[tokio::test(flavor = "multi_thread")] +async fn test_extend_expiration_vs_save() { + let num_experiments = 2000; + let tmp_dir = TempPath::new(); + let db = Arc::new(QuorumStoreDB::new(&tmp_dir)); + let (tx, _rx) = channel(10); + let requester = BatchRequester::new( + 10, + AccountAddress::random(), + 1, + 1, + MockQuorumStoreSender::new(tx), + ); + let (signers, validator_verifier) = random_validator_verifier(4, None, false); + + let batch_store = Arc::new(BatchStore::new( + 10, // epoch + 10, // last committed round + db, 0, 0, - signer.author(), - network_sender, - batch_store_cmd_tx.clone(), - batch_reader_cmd_tx.clone(), - batch_reader_cmd_rx, - Arc::new(QuorumStoreDB::new(db_path)), - validator_verifier.clone(), - Arc::new(signer), - config.batch_expiry_round_gap_when_init, - config.batch_expiry_round_gap_behind_latest_certified, - config.batch_expiry_round_gap_beyond_latest_certified, - config.batch_expiry_grace_rounds, - config.batch_request_num_peers, - config.batch_request_timeout_ms, - config.memory_quota, - config.db_quota, - ); - TestBatchStore { - batch_store, - _batch_reader, - network_rx, - batch_store_cmd_tx, - batch_store_cmd_rx, - batch_reader_cmd_tx, - } -} + 2100, + 0, // grace period rounds + 0, // memory_quota + 1000, // db quota + requester, + signers[0].clone(), + validator_verifier, + )); -async fn get_batch_for_peer_and_check( - network_rx: &mut tokio::sync::mpsc::Receiver<(ConsensusMsg, Vec)>, - batch_reader_cmd_tx: tokio::sync::mpsc::Sender, - digest_hash: HashValue, - remote_peer_id: PeerId, - expected_txns: &[SignedTransaction], -) { - let cmd = BatchReaderCommand::GetBatchForPeer(digest_hash, remote_peer_id); - batch_reader_cmd_tx.send(cmd).await.expect("Could not send"); - let (msg, peer_ids) = network_rx.recv().await.expect("Could not recv"); - assert_eq!(peer_ids.len(), 1); - assert_eq!(peer_ids[0], remote_peer_id); - match msg { - ConsensusMsg::BatchMsg(batch) => { - let txns = batch.into_payload(); - assert_eq!(txns.len(), expected_txns.len()); - for (txn, expected_txn) in txns.iter().zip(expected_txns) { - assert_eq!(txn, expected_txn); + let batch_store_clone1 = batch_store.clone(); + let batch_store_clone2 = batch_store.clone(); + + let digests: Vec = (0..num_experiments).map(|_| HashValue::random()).collect(); + let later_exp_values: Vec<(HashValue, PersistedValue)> = (0..num_experiments) + .map(|i| { + // Pre-insert some of them. + if i % 2 == 0 { + batch_store + .save( + digests[i], + PersistedValue::new( + Some(Vec::new()), + LogicalTime::new(10, i as u64 + 30), + AccountAddress::random(), + 10, + ), + ) + .unwrap(); } - }, - _ => panic!("Unexpected msg {:?}", msg), - } -} -async fn shutdown(batch_store_cmd_tx: tokio::sync::mpsc::Sender) { - let (tx, rx) = tokio::sync::oneshot::channel(); - let cmd = BatchStoreCommand::Shutdown(tx); - batch_store_cmd_tx.send(cmd).await.expect("Could not send"); - rx.await.expect("Could not shutdown"); -} + ( + digests[i], + PersistedValue::new( + Some(Vec::new()), + LogicalTime::new(10, i as u64 + 40), + AccountAddress::random(), + 10, + ), + ) + }) + .collect(); -#[ignore] // TODO: debug and re-enable before deploying quorum store -#[tokio::test(flavor = "multi_thread")] -async fn test_batch_store_recovery() { - let tmp_dir = TempPath::new(); - let txns = create_vec_signed_transactions(100); - let digest_hash = compute_digest_from_signed_transaction(txns.clone()); - let num_bytes = txns - .iter() - .map(SerializedTransaction::from_signed_txn) - .map(|t| t.len()) - .sum(); - let (signers, validator_verifier) = random_validator_verifier(4, None, false); - let peer_id = signers[0].author(); - let remote_peer_id = signers[1].author(); - - { - let test_batch_store = start_batch_store(signers[0].clone(), &tmp_dir, &validator_verifier); - let (proof_coordinator_tx, mut proof_coordinator_rx) = tokio::sync::mpsc::channel(100); - spawn_named!( - "batch store", - test_batch_store - .batch_store - .start(test_batch_store.batch_store_cmd_rx, proof_coordinator_tx) - ); - - // Persist batch and wait - let cmd = BatchStoreCommand::Persist(PersistRequest::new( - peer_id, - txns.clone(), - digest_hash, - num_bytes, - LogicalTime::new(0, 100), - )); - test_batch_store - .batch_store_cmd_tx - .clone() - .send(cmd) - .await - .expect("Could not send"); - let msg = proof_coordinator_rx.recv().await.expect("Could not recv"); - match msg { - ProofCoordinatorCommand::AppendSignature(digest) => { - assert_eq!(digest.digest(), digest_hash); - }, - _ => panic!("Unexpected msg {:?}", msg), + // Marshal threads to start at the same time. + let start_flag = Arc::new(AtomicUsize::new(0)); + let start_clone1 = start_flag.clone(); + let start_clone2 = start_flag.clone(); + + // Thread that extends expiration by saving. + spawn_blocking(move || { + for (i, (digest, later_exp_value)) in later_exp_values.into_iter().enumerate() { + // Wait until both threads are ready for next experiment. + loop { + let flag_val = start_clone1.load(Ordering::Acquire); + if flag_val == 3 * i + 1 || flag_val == 3 * i + 2 { + break; + } + } + + batch_store_clone1.save(digest, later_exp_value).unwrap(); + start_clone1.fetch_add(1, Ordering::Relaxed); } + }); + + // Thread that expires. + spawn_blocking(move || { + for i in 0..num_experiments { + // Wait until both threads are ready for next experiment. + loop { + let flag_val = start_clone2.load(Ordering::Acquire); + if flag_val == 3 * i + 1 || flag_val == 3 * i + 2 { + break; + } + } - let mut network_rx = test_batch_store.network_rx; - get_batch_for_peer_and_check( - &mut network_rx, - test_batch_store.batch_reader_cmd_tx.clone(), - digest_hash, - remote_peer_id, - &txns, - ) - .await; - shutdown(test_batch_store.batch_store_cmd_tx.clone()).await; + block_on( + batch_store_clone2.update_certified_round(LogicalTime::new(10, i as u64 + 30)), + ); + start_clone2.fetch_add(1, Ordering::Relaxed); + } + }); + + for (i, &digest) in digests.iter().enumerate().take(num_experiments) { + // Set the conditions for experiment (both threads waiting). + while start_flag.load(Ordering::Acquire) % 3 != 0 {} + + if i % 2 == 1 { + batch_store + .save( + digest, + PersistedValue::new( + Some(Vec::new()), + LogicalTime::new(10, i as u64 + 30), + AccountAddress::random(), + 10, + ), + ) + .unwrap(); + } + + // Unleash the threads. + start_flag.fetch_add(1, Ordering::Relaxed); } + // Finish the experiment + while start_flag.load(Ordering::Acquire) % 3 != 0 {} - { - let test_batch_store = start_batch_store(signers[0].clone(), &tmp_dir, &validator_verifier); - let (proof_coordinator_tx, _proof_coordinator_rx) = tokio::sync::mpsc::channel(100); - spawn_named!( - "batch store restart", - test_batch_store - .batch_store - .start(test_batch_store.batch_store_cmd_rx, proof_coordinator_tx) - ); - - let mut network_rx = test_batch_store.network_rx; - get_batch_for_peer_and_check( - &mut network_rx, - test_batch_store.batch_reader_cmd_tx.clone(), - digest_hash, - remote_peer_id, - &txns, - ) - .await; - shutdown(test_batch_store.batch_store_cmd_tx.clone()).await; + // Expire everything, call for higher times as well. + for i in 35..50 { + batch_store + .update_certified_round(LogicalTime::new(10, (i + num_experiments) as u64)) + .await; } } + +// TODO: last certified round. +// TODO: check correct digests are returned. +// TODO: check grace period. +// TODO: check quota. +// TODO: check the channels. diff --git a/consensus/src/quorum_store/tests/mod.rs b/consensus/src/quorum_store/tests/mod.rs index 25bdf34e1e97d..2058dc5001220 100644 --- a/consensus/src/quorum_store/tests/mod.rs +++ b/consensus/src/quorum_store/tests/mod.rs @@ -4,8 +4,6 @@ #[cfg(test)] mod batch_aggregator_test; mod batch_generator_test; -mod batch_reader_test; -mod batch_requester_test; mod batch_store_test; mod direct_mempool_quorum_store_test; mod proof_coordinator_test; diff --git a/consensus/src/quorum_store/tests/proof_coordinator_test.rs b/consensus/src/quorum_store/tests/proof_coordinator_test.rs index 5691108b95b50..4c7e4357c4dd7 100644 --- a/consensus/src/quorum_store/tests/proof_coordinator_test.rs +++ b/consensus/src/quorum_store/tests/proof_coordinator_test.rs @@ -9,18 +9,13 @@ use crate::quorum_store::{ types::BatchId, }; use aptos_consensus_types::proof_of_store::{LogicalTime, SignedDigest, SignedDigestInfo}; -use aptos_types::{ - validator_signer::ValidatorSigner, validator_verifier::random_validator_verifier, -}; +use aptos_types::validator_verifier::random_validator_verifier; use futures::channel::oneshot; -use std::sync::Arc; use tokio::sync::mpsc::{channel, error::TryRecvError}; #[tokio::test(flavor = "multi_thread")] async fn test_proof_coordinator_basic() { let (signers, verifier) = random_validator_verifier(4, None, true); - let arc_signers: Vec> = - signers.clone().into_iter().map(Arc::new).collect(); let proof_coordinator = ProofCoordinator::new(100, signers[0].author()); let (proof_coordinator_tx, proof_coordinator_rx) = channel(100); let (proof_manager_tx, mut proof_manager_rx) = channel(100); @@ -40,7 +35,7 @@ async fn test_proof_coordinator_basic() { )) .await .is_ok()); - for arc_signer in &arc_signers { + for signer in &signers { let signed_digest = SignedDigest::new( batch_author, 1, @@ -48,7 +43,7 @@ async fn test_proof_coordinator_basic() { LogicalTime::new(1, 20), 1, 1, - arc_signer.clone(), + signer, ) .unwrap(); assert!(proof_coordinator_tx @@ -96,7 +91,7 @@ async fn test_proof_coordinator_basic() { )) .await .is_ok()); - for arc_signer in &arc_signers { + for signer in &signers { let signed_digest = SignedDigest::new( batch_author, 1, @@ -104,7 +99,7 @@ async fn test_proof_coordinator_basic() { LogicalTime::new(1, 20), 1, 1, - arc_signer.clone(), + signer, ) .unwrap(); assert!(proof_coordinator_tx @@ -131,7 +126,7 @@ async fn test_proof_coordinator_basic() { )) .await .is_ok()); - for _ in 0..arc_signers.len() { + for _ in 0..signers.len() { let signed_digest = SignedDigest::new( batch_author, 1, @@ -139,7 +134,7 @@ async fn test_proof_coordinator_basic() { LogicalTime::new(1, 20), 1, 1, - arc_signers[1].clone(), + &signers[1], ) .unwrap(); assert!(proof_coordinator_tx diff --git a/consensus/src/quorum_store/tests/quorum_store_db_test.rs b/consensus/src/quorum_store/tests/quorum_store_db_test.rs index d6598ed672541..12e9fed37d4b4 100644 --- a/consensus/src/quorum_store/tests/quorum_store_db_test.rs +++ b/consensus/src/quorum_store/tests/quorum_store_db_test.rs @@ -22,7 +22,7 @@ fn test_db_for_data() { assert!(db.save_batch(digest_1, value_1.clone()).is_ok()); assert_eq!( - db.get_batch(digest_1) + db.get_batch(&digest_1) .expect("could not read from db") .unwrap(), value_1 @@ -41,7 +41,7 @@ fn test_db_for_data() { let batches = vec![digest_3]; assert!(db.delete_batches(batches).is_ok()); assert_eq!( - db.get_batch(digest_3).expect("could not read from db"), + db.get_batch(&digest_3).expect("could not read from db"), None ); diff --git a/consensus/src/quorum_store/tests/types_test.rs b/consensus/src/quorum_store/tests/types_test.rs index 2de88ff55aa8e..439e7537f3c30 100644 --- a/consensus/src/quorum_store/tests/types_test.rs +++ b/consensus/src/quorum_store/tests/types_test.rs @@ -29,7 +29,7 @@ fn test_batch() { let batch = Batch::new(source, epoch, digest, signed_txns.clone()); - assert!(batch.verify(source).is_ok()); + assert!(batch.verify().is_ok()); assert_eq!(batch.into_payload(), signed_txns); } diff --git a/consensus/src/quorum_store/types.rs b/consensus/src/quorum_store/types.rs index fa53771d367a3..e096195370f3a 100644 --- a/consensus/src/quorum_store/types.rs +++ b/consensus/src/quorum_store/types.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use aptos_consensus_types::proof_of_store::LogicalTime; -use aptos_crypto::HashValue; +use aptos_crypto::{hash::DefaultHasher, HashValue}; use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher}; use aptos_types::{transaction::SignedTransaction, PeerId}; use bcs::to_bytes; @@ -93,11 +93,11 @@ impl SerializedTransaction { } #[derive(Clone, Eq, Deserialize, Serialize, PartialEq, Debug)] -pub(crate) struct PersistedValue { - pub(crate) maybe_payload: Option>, - pub(crate) expiration: LogicalTime, - pub(crate) author: PeerId, - pub(crate) num_bytes: usize, +pub struct PersistedValue { + pub maybe_payload: Option>, + pub expiration: LogicalTime, + pub author: PeerId, + pub num_bytes: usize, } impl PersistedValue { @@ -310,16 +310,18 @@ impl Batch { self.batch_info.epoch } - // Check the source == the sender. To protect from DDoS we check is Payload matches digest later. - pub fn verify(&self, peer_id: PeerId) -> anyhow::Result<()> { - if self.source == peer_id { + pub fn verify(&self) -> anyhow::Result<()> { + let mut hasher = DefaultHasher::new(b"QuorumStoreBatch"); + let serialized_payload: Vec = self + .payload + .iter() + .flat_map(|txn| to_bytes(txn).unwrap()) + .collect(); + hasher.update(&serialized_payload); + if hasher.finish() == self.digest() { Ok(()) } else { - Err(anyhow::anyhow!( - "Sender mismatch: peer_id: {}, source: {}", - self.source, - peer_id - )) + Err(anyhow::anyhow!("Digest doesn't match")) } } diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index 9b2d627c8de6c..c45963efe5683 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -22,7 +22,7 @@ use crate::{ network_interface::ConsensusMsg, pending_votes::VoteReceptionResult, persistent_liveness_storage::PersistentLivenessStorage, - quorum_store::types::{Batch, BatchRequest, Fragment}, + quorum_store::types::Fragment, }; use anyhow::{bail, ensure, Context, Result}; use aptos_channels::aptos_channel; @@ -69,8 +69,6 @@ pub enum UnverifiedEvent { CommitVote(Box), CommitDecision(Box), FragmentMsg(Box), - BatchRequestMsg(Box), - BatchMsg(Box), SignedDigestMsg(Box), ProofOfStoreMsg(Box), } @@ -108,15 +106,6 @@ impl UnverifiedEvent { f.verify(peer_id)?; VerifiedEvent::FragmentMsg(f) }, - UnverifiedEvent::BatchRequestMsg(br) => { - br.verify(peer_id)?; - VerifiedEvent::BatchRequestMsg(br) - }, - // Only sender is verified. Remaining verification is on-demand (when it's used). - UnverifiedEvent::BatchMsg(b) => { - b.verify(peer_id)?; - VerifiedEvent::UnverifiedBatchMsg(b) - }, UnverifiedEvent::SignedDigestMsg(sd) => { sd.verify(validator)?; VerifiedEvent::SignedDigestMsg(sd) @@ -136,8 +125,6 @@ impl UnverifiedEvent { UnverifiedEvent::CommitVote(cv) => cv.epoch(), UnverifiedEvent::CommitDecision(cd) => cd.epoch(), UnverifiedEvent::FragmentMsg(f) => f.epoch(), - UnverifiedEvent::BatchRequestMsg(br) => br.epoch(), - UnverifiedEvent::BatchMsg(b) => b.epoch(), UnverifiedEvent::SignedDigestMsg(sd) => sd.epoch(), UnverifiedEvent::ProofOfStoreMsg(p) => p.epoch(), } @@ -153,8 +140,6 @@ impl From for UnverifiedEvent { ConsensusMsg::CommitVoteMsg(m) => UnverifiedEvent::CommitVote(m), ConsensusMsg::CommitDecisionMsg(m) => UnverifiedEvent::CommitDecision(m), ConsensusMsg::FragmentMsg(m) => UnverifiedEvent::FragmentMsg(m), - ConsensusMsg::BatchRequestMsg(m) => UnverifiedEvent::BatchRequestMsg(m), - ConsensusMsg::BatchMsg(m) => UnverifiedEvent::BatchMsg(m), ConsensusMsg::SignedDigestMsg(m) => UnverifiedEvent::SignedDigestMsg(m), ConsensusMsg::ProofOfStoreMsg(m) => UnverifiedEvent::ProofOfStoreMsg(m), _ => unreachable!("Unexpected conversion"), @@ -172,8 +157,6 @@ pub enum VerifiedEvent { CommitVote(Box), CommitDecision(Box), FragmentMsg(Box), - BatchRequestMsg(Box), - UnverifiedBatchMsg(Box), SignedDigestMsg(Box), ProofOfStoreMsg(Box), // local messages diff --git a/consensus/src/test_utils/mock_quorum_store_sender.rs b/consensus/src/test_utils/mock_quorum_store_sender.rs index ca010fb5b09e7..b0831999e87aa 100644 --- a/consensus/src/test_utils/mock_quorum_store_sender.rs +++ b/consensus/src/test_utils/mock_quorum_store_sender.rs @@ -10,6 +10,7 @@ use aptos_consensus_types::{ common::Author, proof_of_store::{ProofOfStore, SignedDigest}, }; +use std::time::Duration; use tokio::sync::mpsc::Sender; #[derive(Clone)] @@ -32,6 +33,15 @@ impl QuorumStoreSender for MockQuorumStoreSender { .expect("could not send"); } + async fn request_batch( + &self, + _request: BatchRequest, + _recipient: Author, + _timeout: Duration, + ) -> anyhow::Result { + unimplemented!(); + } + async fn send_batch(&self, batch: Batch, recipients: Vec) { self.tx .send((ConsensusMsg::BatchMsg(Box::new(batch)), recipients)) diff --git a/consensus/src/twins/basic_twins_test.rs b/consensus/src/twins/basic_twins_test.rs index 4e13bfa6532dd..9893efebd5cae 100644 --- a/consensus/src/twins/basic_twins_test.rs +++ b/consensus/src/twins/basic_twins_test.rs @@ -192,6 +192,7 @@ fn twins_vote_dedup_test() { /// /// Run the test: /// cargo xtest -p consensus twins_proposer_test -- --nocapture +#[ignore] fn twins_proposer_test() { let runtime = consensus_runtime(); let mut playground = NetworkPlayground::new(runtime.handle().clone());