From 52740b0c1bc7ff2efe39e585d4f5b638731b4a17 Mon Sep 17 00:00:00 2001
From: Zekun Li
Date: Sat, 25 Feb 2023 19:08:38 -0800
Subject: [PATCH] [quorum store] refactor batch_reader/store and batch
requester
This commit merges batch_reader and batch_store into a single struct and uses it as synchronous component (via function calls)
instead of asynchronous one (via channels).
Also batch_requester is refactored to use network rpc directly.
---
.../consensus-types/src/proof_of_store.rs | 3 +-
consensus/src/epoch_manager.rs | 47 ++-
consensus/src/network.rs | 62 +++-
consensus/src/network_tests.rs | 6 +-
consensus/src/payload_manager.rs | 14 +-
.../src/quorum_store/batch_coordinator.rs | 59 ++--
consensus/src/quorum_store/batch_reader.rs | 308 ++++++++----------
consensus/src/quorum_store/batch_requester.rs | 99 ++----
consensus/src/quorum_store/batch_store.rs | 262 +--------------
.../src/quorum_store/network_listener.rs | 33 +-
.../src/quorum_store/quorum_store_builder.rs | 114 ++++---
.../quorum_store/quorum_store_coordinator.rs | 15 +-
consensus/src/quorum_store/quorum_store_db.rs | 4 +-
.../quorum_store/tests/batch_reader_test.rs | 49 ++-
.../tests/batch_requester_test.rs | 74 -----
.../quorum_store/tests/batch_store_test.rs | 190 -----------
consensus/src/quorum_store/tests/mod.rs | 2 -
.../tests/proof_coordinator_test.rs | 19 +-
.../tests/quorum_store_db_test.rs | 4 +-
.../src/quorum_store/tests/types_test.rs | 2 +-
consensus/src/quorum_store/types.rs | 30 +-
consensus/src/round_manager.rs | 19 +-
.../test_utils/mock_quorum_store_sender.rs | 10 +
consensus/src/twins/basic_twins_test.rs | 2 +
types/src/on_chain_config/consensus_config.rs | 2 +-
25 files changed, 447 insertions(+), 982 deletions(-)
delete mode 100644 consensus/src/quorum_store/tests/batch_requester_test.rs
delete mode 100644 consensus/src/quorum_store/tests/batch_store_test.rs
diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs
index dc3056f9e92a1..d4f2fa603dcd7 100644
--- a/consensus/consensus-types/src/proof_of_store.rs
+++ b/consensus/consensus-types/src/proof_of_store.rs
@@ -11,7 +11,6 @@ use aptos_types::{
};
use rand::{seq::SliceRandom, thread_rng};
use serde::{Deserialize, Serialize};
-use std::sync::Arc;
#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub struct LogicalTime {
@@ -78,7 +77,7 @@ impl SignedDigest {
expiration: LogicalTime,
num_txns: u64,
num_bytes: u64,
- validator_signer: Arc,
+ validator_signer: &ValidatorSigner,
) -> Result {
let info = SignedDigestInfo::new(batch_author, digest, expiration, num_txns, num_bytes);
let signature = validator_signer.sign(&info)?;
diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs
index 1d413ba212e0f..538eb866bee6c 100644
--- a/consensus/src/epoch_manager.rs
+++ b/consensus/src/epoch_manager.rs
@@ -29,13 +29,17 @@ use crate::{
logging::{LogEvent, LogSchema},
metrics_safety_rules::MetricsSafetyRules,
monitor,
- network::{IncomingBlockRetrievalRequest, NetworkReceivers, NetworkSender},
+ network::{
+ IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest, NetworkReceivers,
+ NetworkSender,
+ },
network_interface::{ConsensusMsg, ConsensusNetworkClient},
payload_client::QuorumStoreClient,
persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
quorum_store::{
quorum_store_builder::{DirectMempoolInnerBuilder, InnerBuilder, QuorumStoreBuilder},
quorum_store_coordinator::CoordinatorCommand,
+ quorum_store_db::QuorumStoreDB,
},
recovery_manager::RecoveryManager,
round_manager::{RoundManager, UnverifiedEvent, VerifiedEvent},
@@ -80,7 +84,6 @@ use std::{
cmp::Ordering,
collections::HashMap,
mem::{discriminant, Discriminant},
- path::PathBuf,
sync::Arc,
time::Duration,
};
@@ -124,9 +127,11 @@ pub struct EpochManager {
epoch_state: Option,
block_retrieval_tx:
Option>,
- quorum_store_storage_path: PathBuf,
quorum_store_msg_tx: Option>,
quorum_store_coordinator_tx: Option>,
+ quorum_store_db: Arc,
+ batch_retrieval_tx:
+ Option>,
}
impl EpochManager {
@@ -145,6 +150,7 @@ impl EpochManager {
let config = node_config.consensus.clone();
let sr_config = &node_config.consensus.safety_rules;
let safety_rules_manager = SafetyRulesManager::new(sr_config);
+ let quorum_store_db = Arc::new(QuorumStoreDB::new(node_config.storage.dir()));
Self {
author,
config,
@@ -165,9 +171,10 @@ impl EpochManager {
round_manager_close_tx: None,
epoch_state: None,
block_retrieval_tx: None,
- quorum_store_storage_path: node_config.storage.dir(),
quorum_store_msg_tx: None,
quorum_store_coordinator_tx: None,
+ quorum_store_db,
+ batch_retrieval_tx: None,
}
}
@@ -538,6 +545,7 @@ impl EpochManager {
// Shutdown the block retrieval task by dropping the sender
self.block_retrieval_tx = None;
+ self.batch_retrieval_tx = None;
if let Some(mut quorum_store_coordinator_tx) = self.quorum_store_coordinator_tx.take() {
let (ack_tx, ack_rx) = oneshot::channel();
@@ -644,7 +652,7 @@ impl EpochManager {
network_sender.clone(),
epoch_state.verifier.clone(),
self.config.safety_rules.backend.clone(),
- self.quorum_store_storage_path.clone(),
+ self.quorum_store_db.clone(),
))
} else {
info!("Building DirectMempool");
@@ -689,8 +697,10 @@ impl EpochManager {
onchain_consensus_config.back_pressure_limit(),
payload_manager.clone(),
));
+ let (tx, rx) = aptos_channel::new(QueueStyle::LIFO, 1, None);
+ self.batch_retrieval_tx = Some(tx);
- self.quorum_store_coordinator_tx = quorum_store_builder.start();
+ self.quorum_store_coordinator_tx = quorum_store_builder.start(rx);
info!(epoch = epoch, "Create ProposalGenerator");
// txn manager is required both by proposal generator (to pull the proposers)
@@ -909,8 +919,6 @@ impl EpochManager {
) -> anyhow::Result<()> {
match event {
UnverifiedEvent::FragmentMsg(_)
- | UnverifiedEvent::BatchRequestMsg(_)
- | UnverifiedEvent::BatchMsg(_)
| UnverifiedEvent::SignedDigestMsg(_)
| UnverifiedEvent::ProofOfStoreMsg(_) => {
if self.quorum_store_enabled {
@@ -938,9 +946,7 @@ impl EpochManager {
);
}
match event {
- quorum_store_event @ (VerifiedEvent::BatchRequestMsg(_)
- | VerifiedEvent::UnverifiedBatchMsg(_)
- | VerifiedEvent::SignedDigestMsg(_)
+ quorum_store_event @ (VerifiedEvent::SignedDigestMsg(_)
| VerifiedEvent::ProofOfStoreMsg(_)
| VerifiedEvent::FragmentMsg(_)) => {
if let Some(sender) = &mut self.quorum_store_msg_tx {
@@ -987,6 +993,18 @@ impl EpochManager {
}
}
+ fn process_batch_retrieval(
+ &self,
+ peer_id: Author,
+ request: IncomingBatchRetrievalRequest,
+ ) -> anyhow::Result<()> {
+ if let Some(tx) = &self.batch_retrieval_tx {
+ tx.push(peer_id, request)
+ } else {
+ Err(anyhow::anyhow!("Quorum store not started"))
+ }
+ }
+
fn process_local_timeout(&mut self, round: u64) {
self.forward_to_round_manager(self.author, VerifiedEvent::LocalTimeout(round));
}
@@ -1020,11 +1038,16 @@ impl EpochManager {
error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e));
}
},
- (peer, request) = network_receivers.block_retrieval.select_next_some() => {
+ (peer, request) = network_receivers.block_retrieval_rx.select_next_some() => {
if let Err(e) = self.process_block_retrieval(peer, request) {
error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e));
}
},
+ (peer, request) = network_receivers.batch_retrieval_rx.select_next_some() => {
+ if let Err(e) = self.process_batch_retrieval(peer, request) {
+ error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e));
+ }
+ },
round = round_timeout_sender_rx.select_next_some() => {
self.process_local_timeout(round);
},
diff --git a/consensus/src/network.rs b/consensus/src/network.rs
index f26d95465da6c..377dde6a6ebee 100644
--- a/consensus/src/network.rs
+++ b/consensus/src/network.rs
@@ -53,6 +53,13 @@ pub struct IncomingBlockRetrievalRequest {
pub response_sender: oneshot::Sender>,
}
+#[derive(Debug)]
+pub struct IncomingBatchRetrievalRequest {
+ pub req: BatchRequest,
+ pub protocol: ProtocolId,
+ pub response_sender: oneshot::Sender>,
+}
+
/// Just a convenience struct to keep all the network proxy receiving queues in one place.
/// Will be returned by the NetworkTask upon startup.
pub struct NetworkReceivers {
@@ -65,14 +72,23 @@ pub struct NetworkReceivers {
(AccountAddress, Discriminant),
(AccountAddress, ConsensusMsg),
>,
- pub block_retrieval:
+ pub block_retrieval_rx:
aptos_channel::Receiver,
+ pub batch_retrieval_rx:
+ aptos_channel::Receiver,
}
#[async_trait::async_trait]
-pub(crate) trait QuorumStoreSender {
+pub trait QuorumStoreSender: Send + Clone {
async fn send_batch_request(&self, request: BatchRequest, recipients: Vec);
+ async fn request_batch(
+ &self,
+ request: BatchRequest,
+ recipient: Author,
+ timeout: Duration,
+ ) -> anyhow::Result;
+
async fn send_batch(&self, batch: Batch, recipients: Vec);
async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec);
@@ -311,6 +327,23 @@ impl QuorumStoreSender for NetworkSender {
self.send(msg, recipients).await
}
+ async fn request_batch(
+ &self,
+ request: BatchRequest,
+ recipient: Author,
+ timeout: Duration,
+ ) -> anyhow::Result {
+ let msg = ConsensusMsg::BatchRequestMsg(Box::new(request));
+ let response = self
+ .consensus_network_client
+ .send_rpc(recipient, msg, timeout)
+ .await?;
+ match response {
+ ConsensusMsg::BatchMsg(batch) => Ok(*batch),
+ _ => Err(anyhow!("Invalid batch response")),
+ }
+ }
+
async fn send_batch(&self, batch: Batch, recipients: Vec) {
fail_point!("consensus::send::batch", |_| ());
let msg = ConsensusMsg::BatchMsg(Box::new(batch));
@@ -347,6 +380,8 @@ pub struct NetworkTask {
>,
block_retrieval_tx:
aptos_channel::Sender,
+ batch_retrieval_tx:
+ aptos_channel::Sender,
all_events: Box> + Send + Unpin>,
}
@@ -364,11 +399,13 @@ impl NetworkTask {
50,
Some(&counters::QUORUM_STORE_CHANNEL_MSGS),
);
- let (block_retrieval_tx, block_retrieval) = aptos_channel::new(
+ let (block_retrieval_tx, block_retrieval_rx) = aptos_channel::new(
QueueStyle::LIFO,
1,
Some(&counters::BLOCK_RETRIEVAL_CHANNEL_MSGS),
);
+ let (batch_retrieval_tx, batch_retrieval_rx) =
+ aptos_channel::new(QueueStyle::LIFO, 1, None);
// Verify the network events have been constructed correctly
let network_and_events = network_service_events.into_network_and_events();
@@ -388,12 +425,14 @@ impl NetworkTask {
consensus_messages_tx,
quorum_store_messages_tx,
block_retrieval_tx,
+ batch_retrieval_tx,
all_events,
},
NetworkReceivers {
consensus_messages,
quorum_store_messages,
- block_retrieval,
+ block_retrieval_rx,
+ batch_retrieval_rx,
},
)
}
@@ -422,10 +461,11 @@ impl NetworkTask {
.with_label_values(&[msg.name()])
.inc();
match msg {
+ ConsensusMsg::BatchRequestMsg(_) | ConsensusMsg::BatchMsg(_) => {
+ warn!("unexpected msg");
+ },
quorum_store_msg @ (ConsensusMsg::SignedDigestMsg(_)
| ConsensusMsg::FragmentMsg(_)
- | ConsensusMsg::BatchRequestMsg(_)
- | ConsensusMsg::BatchMsg(_)
| ConsensusMsg::ProofOfStoreMsg(_)) => {
Self::push_msg(
peer_id,
@@ -475,6 +515,16 @@ impl NetworkTask {
warn!(error = ?e, "aptos channel closed");
}
},
+ ConsensusMsg::BatchRequestMsg(request) => {
+ let req_with_callback = IncomingBatchRetrievalRequest {
+ req: *request,
+ protocol,
+ response_sender: callback,
+ };
+ let _ = self
+ .batch_retrieval_tx
+ .push(peer_id, (peer_id, req_with_callback));
+ },
_ => {
warn!(remote_peer = peer_id, "Unexpected msg: {:?}", msg);
continue;
diff --git a/consensus/src/network_tests.rs b/consensus/src/network_tests.rs
index adc5215d54df2..a56e05dad9ff4 100644
--- a/consensus/src/network_tests.rs
+++ b/consensus/src/network_tests.rs
@@ -749,7 +749,7 @@ mod tests {
);
// verify request block rpc
- let mut block_retrieval = receiver_1.block_retrieval;
+ let mut block_retrieval = receiver_1.block_retrieval_rx;
let on_request_block = async move {
while let Some((_, request)) = block_retrieval.next().await {
// make sure the network task is not blocked during RPC
@@ -824,13 +824,13 @@ mod tests {
.unwrap();
let f_check = async move {
- assert!(network_receivers.block_retrieval.next().await.is_some());
+ assert!(network_receivers.block_retrieval_rx.next().await.is_some());
drop(peer_mgr_notifs_tx);
drop(connection_notifs_tx);
drop(self_sender);
- assert!(network_receivers.block_retrieval.next().await.is_none());
+ assert!(network_receivers.block_retrieval_rx.next().await.is_none());
assert!(network_receivers.consensus_messages.next().await.is_none());
};
let f_network_task = network_task.start();
diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs
index d50b19ab900c8..10ac4cb5ca871 100644
--- a/consensus/src/payload_manager.rs
+++ b/consensus/src/payload_manager.rs
@@ -1,8 +1,9 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0
-use crate::quorum_store::{
- batch_reader::BatchReader, quorum_store_coordinator::CoordinatorCommand,
+use crate::{
+ network::NetworkSender,
+ quorum_store::{batch_reader::BatchReader, quorum_store_coordinator::CoordinatorCommand},
};
use aptos_consensus_types::{
block::Block,
@@ -22,14 +23,17 @@ use tokio::sync::oneshot;
/// If QuorumStore is enabled, has to ask BatchReader for the transaction behind the proofs of availability in the payload.
pub enum PayloadManager {
DirectMempool,
- InQuorumStore(Arc, Mutex>),
+ InQuorumStore(
+ Arc>,
+ Mutex>,
+ ),
}
impl PayloadManager {
async fn request_transactions(
proofs: Vec,
logical_time: LogicalTime,
- batch_reader: &BatchReader,
+ batch_reader: &BatchReader,
) -> Vec<(
HashValue,
oneshot::Receiver, aptos_executor_types::Error>>,
@@ -43,7 +47,7 @@ impl PayloadManager {
logical_time
);
if logical_time <= pos.expiration() {
- receivers.push((*pos.digest(), batch_reader.get_batch(pos).await));
+ receivers.push((*pos.digest(), batch_reader.get_batch(pos)));
} else {
debug!("QS: skipped expired pos");
}
diff --git a/consensus/src/quorum_store/batch_coordinator.rs b/consensus/src/quorum_store/batch_coordinator.rs
index b354d2cc044b3..745b983309bec 100644
--- a/consensus/src/quorum_store/batch_coordinator.rs
+++ b/consensus/src/quorum_store/batch_coordinator.rs
@@ -5,7 +5,8 @@ use crate::{
network::{NetworkSender, QuorumStoreSender},
quorum_store::{
batch_aggregator::BatchAggregator,
- batch_store::{BatchStoreCommand, PersistRequest},
+ batch_reader::BatchReader,
+ batch_store::PersistRequest,
counters,
proof_coordinator::{ProofCoordinatorCommand, ProofReturnChannel},
types::{BatchId, Fragment, SerializedTransaction},
@@ -14,7 +15,7 @@ use crate::{
use aptos_consensus_types::proof_of_store::{LogicalTime, SignedDigestInfo};
use aptos_logger::prelude::*;
use aptos_types::PeerId;
-use std::collections::HashMap;
+use std::{collections::HashMap, sync::Arc};
use tokio::sync::{
mpsc::{Receiver, Sender},
oneshot,
@@ -38,7 +39,8 @@ pub struct BatchCoordinator {
my_peer_id: PeerId,
network_sender: NetworkSender,
command_rx: Receiver,
- batch_store_tx: Sender,
+ // batch_store_tx: Sender,
+ batch_reader: Arc>,
proof_coordinator_tx: Sender,
max_batch_bytes: usize,
remote_batch_aggregators: HashMap,
@@ -52,7 +54,8 @@ impl BatchCoordinator {
my_peer_id: PeerId,
network_sender: NetworkSender,
wrapper_command_rx: Receiver,
- batch_store_tx: Sender,
+ batch_reader: Arc>,
+ // batch_store_tx: Sender,
proof_coordinator_tx: Sender,
max_batch_bytes: usize,
) -> Self {
@@ -61,7 +64,7 @@ impl BatchCoordinator {
my_peer_id,
network_sender,
command_rx: wrapper_command_rx,
- batch_store_tx,
+ batch_reader,
proof_coordinator_tx,
max_batch_bytes,
remote_batch_aggregators: HashMap::new(),
@@ -105,7 +108,7 @@ impl BatchCoordinator {
batch_id: BatchId,
expiration: LogicalTime,
proof_tx: ProofReturnChannel,
- ) -> (BatchStoreCommand, Fragment) {
+ ) -> (PersistRequest, Fragment) {
match self.local_batch_aggregator.end_batch(
batch_id,
self.local_fragment_id,
@@ -143,7 +146,7 @@ impl BatchCoordinator {
num_bytes,
expiration,
);
- (BatchStoreCommand::Persist(persist_request), fragment)
+ (persist_request, fragment)
},
Err(e) => {
unreachable!(
@@ -154,7 +157,7 @@ impl BatchCoordinator {
}
}
- async fn handle_fragment(&mut self, fragment: Fragment) {
+ async fn handle_fragment(&mut self, fragment: Fragment) -> Option {
let source = fragment.source();
let entry = self
.remote_batch_aggregators
@@ -176,13 +179,9 @@ impl BatchCoordinator {
fragment.into_transactions(),
) {
Ok((num_bytes, payload, digest)) => {
- let persist_cmd = BatchStoreCommand::Persist(PersistRequest::new(
- source, payload, digest, num_bytes, expiration,
- ));
- self.batch_store_tx
- .send(persist_cmd)
- .await
- .expect("BatchStore receiver not available");
+ let persist_request =
+ PersistRequest::new(source, payload, digest, num_bytes, expiration);
+ return Some(persist_request);
},
Err(e) => {
debug!("Could not append batch from {:?}, error {:?}", source, e);
@@ -204,6 +203,24 @@ impl BatchCoordinator {
) {
debug!("Could not append batch from {:?}, error {:?}", source, e);
}
+ None
+ }
+
+ fn persist(&self, persist_request: PersistRequest) {
+ let batch_reader = self.batch_reader.clone();
+ let network_sender = self.network_sender.clone();
+ let my_peer_id = self.my_peer_id;
+ tokio::spawn(async move {
+ let peer_id = persist_request.value.author;
+ if let Some(signed_digest) = batch_reader.persist(persist_request) {
+ if my_peer_id != peer_id {
+ counters::DELIVERED_BATCHES_COUNT.inc();
+ }
+ network_sender
+ .send_signed_digest(signed_digest, vec![peer_id])
+ .await;
+ }
+ });
}
pub(crate) async fn start(mut self) {
@@ -229,23 +246,21 @@ impl BatchCoordinator {
proof_tx,
) => {
debug!("QS: end batch cmd received, batch id = {}", batch_id);
- let (batch_store_command, fragment) = self
+ let (persist_request, fragment) = self
.handle_end_batch(fragment_payload, batch_id, logical_time, proof_tx)
.await;
self.network_sender.broadcast_fragment(fragment).await;
-
- self.batch_store_tx
- .send(batch_store_command)
- .await
- .expect("Failed to send to BatchStore");
+ self.persist(persist_request);
counters::NUM_FRAGMENT_PER_BATCH.observe((self.local_fragment_id + 1) as f64);
self.local_fragment_id = 0;
},
BatchCoordinatorCommand::RemoteFragment(fragment) => {
- self.handle_fragment(*fragment).await;
+ if let Some(persist_request) = self.handle_fragment(*fragment).await {
+ self.persist(persist_request);
+ }
},
}
}
diff --git a/consensus/src/quorum_store/batch_reader.rs b/consensus/src/quorum_store/batch_reader.rs
index d06fe11b8693d..ce78a63662707 100644
--- a/consensus/src/quorum_store/batch_reader.rs
+++ b/consensus/src/quorum_store/batch_reader.rs
@@ -4,53 +4,33 @@
use crate::{
network::QuorumStoreSender,
quorum_store::{
- batch_requester::BatchRequester,
- batch_store::BatchStoreCommand,
- counters,
- types::{Batch, PersistedValue},
- utils::RoundExpirations,
+ batch_requester::BatchRequester, batch_store::PersistRequest, counters,
+ quorum_store_db::QuorumStoreDB, types::PersistedValue, utils::RoundExpirations,
},
};
use anyhow::bail;
use aptos_consensus_types::{
common::Round,
- proof_of_store::{LogicalTime, ProofOfStore},
+ proof_of_store::{LogicalTime, ProofOfStore, SignedDigest},
};
use aptos_crypto::HashValue;
use aptos_executor_types::Error;
use aptos_logger::debug;
-use aptos_types::{transaction::SignedTransaction, validator_verifier::ValidatorVerifier, PeerId};
+use aptos_types::{
+ transaction::SignedTransaction, validator_signer::ValidatorSigner,
+ validator_verifier::ValidatorVerifier, PeerId,
+};
use dashmap::{
mapref::entry::Entry::{Occupied, Vacant},
DashMap,
};
use fail::fail_point;
use once_cell::sync::OnceCell;
-use std::{
- collections::HashMap,
- sync::{
- atomic::{AtomicBool, AtomicU64, Ordering},
- Arc, Mutex,
- },
- time::Duration,
+use std::sync::{
+ atomic::{AtomicU64, Ordering},
+ Arc, Mutex,
};
-use tokio::{
- sync::{
- mpsc::{Receiver, Sender},
- oneshot, Notify,
- },
- time,
-};
-
-#[derive(Debug)]
-pub(crate) enum BatchReaderCommand {
- GetBatchForPeer(HashValue, PeerId),
- GetBatchForSelf(
- ProofOfStore,
- oneshot::Sender, Error>>,
- ),
- BatchResponse(HashValue, Vec),
-}
+use tokio::sync::oneshot;
#[derive(PartialEq)]
enum StorageMode {
@@ -112,59 +92,60 @@ fn payload_storage_mode(persisted_value: &PersistedValue) -> StorageMode {
/// Provides in memory representation of stored batches (strong cache), and allows
/// efficient concurrent readers.
-pub struct BatchReader {
+pub struct BatchReader {
epoch: OnceCell,
- my_peer_id: PeerId,
last_certified_round: AtomicU64,
db_cache: DashMap,
peer_quota: DashMap,
expirations: Mutex>,
- batch_store_tx: Sender,
- self_tx: Sender,
+ db: Arc,
batch_expiry_round_gap_when_init: Round,
batch_expiry_round_gap_behind_latest_certified: Round,
batch_expiry_round_gap_beyond_latest_certified: Round,
expiry_grace_rounds: Round,
memory_quota: usize,
db_quota: usize,
- shutdown_flag: AtomicBool,
- shutdown_notify: Notify,
+ batch_requester: BatchRequester,
+ validator_signer: ValidatorSigner,
+ validator_verifier: ValidatorVerifier,
}
-impl BatchReader {
+impl BatchReader {
pub(crate) fn new(
epoch: u64,
last_certified_round: Round,
- db_content: HashMap,
- my_peer_id: PeerId,
- batch_store_tx: Sender,
- self_tx: Sender,
+ db: Arc,
batch_expiry_round_gap_when_init: Round,
batch_expiry_round_gap_behind_latest_certified: Round,
batch_expiry_round_gap_beyond_latest_certified: Round,
expiry_grace_rounds: Round,
memory_quota: usize,
db_quota: usize,
- ) -> (Arc, Vec) {
- let self_ob = Self {
+ batch_requester: BatchRequester,
+ validator_signer: ValidatorSigner,
+ validator_verifier: ValidatorVerifier,
+ ) -> Self {
+ let db_clone = db.clone();
+ let batch_reader = Self {
epoch: OnceCell::with_value(epoch),
- my_peer_id,
last_certified_round: AtomicU64::new(last_certified_round),
db_cache: DashMap::new(),
peer_quota: DashMap::new(),
expirations: Mutex::new(RoundExpirations::new()),
- batch_store_tx,
- self_tx,
+ db,
batch_expiry_round_gap_when_init,
batch_expiry_round_gap_behind_latest_certified,
batch_expiry_round_gap_beyond_latest_certified,
expiry_grace_rounds,
memory_quota,
db_quota,
- shutdown_flag: AtomicBool::new(false),
- shutdown_notify: Notify::new(),
+ batch_requester,
+ validator_signer,
+ validator_verifier,
};
-
+ let db_content = db_clone
+ .get_all_batches()
+ .expect("failed to read data from db");
let mut expired_keys = Vec::new();
debug!(
"QS: Batchreader {} {} {}",
@@ -186,23 +167,35 @@ impl BatchReader {
{
expired_keys.push(digest);
} else {
- self_ob
+ batch_reader
.update_cache(digest, value)
.expect("Storage limit exceeded upon BatchReader construction");
}
}
-
debug!(
"QS: Batchreader recovery expired keys len {}",
expired_keys.len()
);
- (Arc::new(self_ob), expired_keys)
+ db_clone.delete_batches(expired_keys).unwrap();
+
+ batch_reader
}
fn epoch(&self) -> u64 {
*self.epoch.get().unwrap()
}
+ fn free_quota(&self, persisted_value: PersistedValue) {
+ let mut quota_manager = self
+ .peer_quota
+ .get_mut(&persisted_value.author)
+ .expect("No QuotaManager for batch author");
+ quota_manager.free_quota(
+ persisted_value.num_bytes,
+ payload_storage_mode(&persisted_value),
+ );
+ }
+
// Return an error if storage quota is exceeded.
fn update_cache(&self, digest: HashValue, mut value: PersistedValue) -> anyhow::Result<()> {
let author = value.author;
@@ -269,11 +262,6 @@ impl BatchReader {
self.batch_expiry_round_gap_beyond_latest_certified);
}
- pub async fn shutdown(&self) {
- self.shutdown_flag.swap(true, Ordering::Relaxed);
- self.shutdown_notify.notified().await;
- }
-
fn clear_expired_payload(&self, certified_time: LogicalTime) -> Vec {
assert_eq!(
certified_time.epoch(),
@@ -312,15 +300,44 @@ impl BatchReader {
ret
}
- fn free_quota(&self, persisted_value: PersistedValue) {
- let mut quota_manager = self
- .peer_quota
- .get_mut(&persisted_value.author)
- .expect("No QuotaManager for batch author");
- quota_manager.free_quota(
- persisted_value.num_bytes,
- payload_storage_mode(&persisted_value),
+ pub fn persist(&self, persist_request: PersistRequest) -> Option {
+ let expiration = persist_request.value.expiration;
+ // Network listener should filter messages with wrong expiration epoch.
+ assert_eq!(
+ expiration.epoch(),
+ self.epoch(),
+ "Persist Request for a batch with an incorrect epoch"
);
+
+ match self.save(persist_request.digest, persist_request.value.clone()) // TODO: what is this comes from old epoch?
+ {
+ Ok(needs_db) => {
+ let num_txns = persist_request.value.maybe_payload.as_ref().unwrap().len() as u64;
+ let num_bytes = persist_request.value.num_bytes as u64;
+ let batch_author = persist_request.value.author;
+ debug!("QS: sign digest");
+ if needs_db {
+ // TODO: Consider an async call to DB, but it could be a race with clean.
+ self.db
+ .save_batch(persist_request.digest, persist_request.value)
+ .expect("Could not write to DB");
+ }
+ Some(SignedDigest::new(
+ batch_author,
+ self.epoch(),
+ persist_request.digest,
+ expiration,
+ num_txns,
+ num_bytes,
+ &self.validator_signer,
+ ).unwrap())
+ }
+
+ Err(e) => {
+ debug!("QS: failed to store to cache {:?}", e);
+ None
+ }
+ }
}
// TODO: make sure state-sync also sends the message, or execution cleans.
@@ -352,12 +369,8 @@ impl BatchReader {
);
let expired_keys = self.clear_expired_payload(certified_time);
- if let Err(e) = self
- .batch_store_tx
- .send(BatchStoreCommand::Clean(expired_keys))
- .await
- {
- debug!("QS: Failed to send to BatchStore: {:?}", e);
+ if let Err(e) = self.db.delete_batches(expired_keys) {
+ debug!("Error deleting batches: {:?}", e)
}
}
@@ -365,127 +378,66 @@ impl BatchReader {
self.last_certified_round.load(Ordering::Relaxed)
}
- pub async fn get_batch(
- &self,
- proof: ProofOfStore,
- ) -> oneshot::Receiver, Error>> {
- let (tx, rx) = oneshot::channel();
+ fn get_batch_from_db(&self, digest: &HashValue) -> Result, Error> {
+ counters::GET_BATCH_FROM_DB_COUNT.inc();
+
+ match self.db.get_batch(digest) {
+ Ok(Some(persisted_value)) => {
+ let payload = persisted_value
+ .maybe_payload
+ .expect("Persisted value in QuorumStore DB must have payload");
+ return Ok(payload);
+ },
+ Ok(None) => {
+ unreachable!("Could not read persisted value (according to BatchReader) from DB")
+ },
+ Err(_) => {
+ // TODO: handle error, e.g. from self or not, log, panic.
+ },
+ }
+ Err(Error::CouldNotGetData)
+ }
- if let Some(value) = self.db_cache.get(proof.digest()) {
+ pub fn get_batch_from_local(
+ &self,
+ digest: &HashValue,
+ ) -> Result, Error> {
+ if let Some(value) = self.db_cache.get(digest) {
if payload_storage_mode(&value) == StorageMode::PersistedOnly {
assert!(
value.maybe_payload.is_none(),
"BatchReader payload and storage kind mismatch"
);
- self.batch_store_tx
- .send(BatchStoreCommand::BatchRequest(
- *proof.digest(),
- self.my_peer_id,
- Some(tx),
- ))
- .await
- .expect("Failed to send to BatchStore");
+ self.get_batch_from_db(digest)
} else {
// Available in memory.
- if tx
- .send(Ok(value
- .maybe_payload
- .clone()
- .expect("BatchReader payload and storage kind mismatch")))
- .is_err()
- {
- debug!(
- "Receiver of requested batch is not available for digest {}",
- proof.digest()
- );
- }
+ Ok(value
+ .maybe_payload
+ .clone()
+ .expect("BatchReader payload and storage kind mismatch"))
}
} else {
- // Quorum store metrics
- counters::MISSED_BATCHES_COUNT.inc();
-
- self.self_tx
- .send(BatchReaderCommand::GetBatchForSelf(proof, tx))
- .await
- .expect("Batch Reader Receiver is not available");
+ Err(Error::CouldNotGetData)
}
- rx
}
- pub(crate) async fn start(
+ pub fn get_batch(
&self,
- mut batch_reader_rx: Receiver,
- network_sender: T,
- request_num_peers: usize,
- request_timeout_ms: usize,
- verifier: ValidatorVerifier,
- ) {
- debug!(
- "[QS worker] BatchReader worker for epoch {} starting",
- self.epoch()
- );
-
- let mut batch_requester = BatchRequester::new(
- self.epoch(),
- self.my_peer_id,
- request_num_peers,
- request_timeout_ms,
- network_sender.clone(),
- );
-
- let mut interval = time::interval(Duration::from_millis(100));
-
- loop {
- tokio::select! {
- biased;
-
- _ = interval.tick() => {
- batch_requester.handle_timeouts().await;
- if self.shutdown_flag.load(Ordering::Relaxed) {
- break;
- }
- },
+ proof: ProofOfStore,
+ ) -> oneshot::Receiver, Error>> {
+ let (tx, rx) = oneshot::channel();
- Some(cmd) = batch_reader_rx.recv() => {
- match cmd {
- BatchReaderCommand::GetBatchForPeer(digest, peer_id) => {
- if let Some(value) = self.db_cache.get(&digest) {
- match payload_storage_mode(&value) {
- StorageMode::PersistedOnly => {
- assert!(value.maybe_payload.is_none(), "BatchReader payload and storage kind mismatch");
- if self.batch_store_tx.send(BatchStoreCommand::BatchRequest(digest, peer_id, None)).await.is_err() {
- debug!("Failed to send request to BatchStore");
- }
- },
- StorageMode::MemoryAndPersisted => {
- let batch = Batch::new(
- self.my_peer_id,
- self.epoch(),
- digest,
- value.maybe_payload.clone().expect("BatchReader payload and storage kind mismatch"),
- );
- network_sender.send_batch(batch, vec![peer_id]).await;
- },
- } // TODO: consider returning Nack
- }
- },
- BatchReaderCommand::GetBatchForSelf(proof, ret_tx) => {
- batch_requester
- .add_request(*proof.digest(), proof.shuffled_signers(&verifier), ret_tx)
- .await;
- },
- BatchReaderCommand::BatchResponse(digest, payload) => {
- batch_requester.serve_request(digest, payload);
- },
- }
- },
- }
+ if let Ok(value) = self.get_batch_from_local(proof.digest()) {
+ tx.send(Ok(value)).unwrap();
+ } else {
+ // Quorum store metrics
+ counters::MISSED_BATCHES_COUNT.inc();
+ self.batch_requester.request_batch(
+ *proof.digest(),
+ proof.shuffled_signers(&self.validator_verifier),
+ tx,
+ );
}
-
- self.shutdown_notify.notify_one();
- debug!(
- "[QS worker] BatchReader worker for epoch {} stopping",
- self.epoch()
- );
+ rx
}
}
diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs
index 578458539d30d..fec2f0b963a41 100644
--- a/consensus/src/quorum_store/batch_requester.rs
+++ b/consensus/src/quorum_store/batch_requester.rs
@@ -3,14 +3,14 @@
use crate::{
network::QuorumStoreSender,
- quorum_store::{counters, types::BatchRequest, utils::DigestTimeouts},
+ quorum_store::{counters, types::BatchRequest},
};
-use aptos_crypto::{hash::DefaultHasher, HashValue};
+use aptos_crypto::HashValue;
use aptos_executor_types::*;
use aptos_logger::debug;
use aptos_types::{transaction::SignedTransaction, PeerId};
-use bcs::to_bytes;
-use std::collections::HashMap;
+use futures::{stream::FuturesUnordered, StreamExt};
+use std::time::Duration;
use tokio::sync::oneshot;
struct BatchRequesterState {
@@ -79,17 +79,15 @@ impl BatchRequesterState {
}
}
-pub(crate) struct BatchRequester {
+pub(crate) struct BatchRequester {
epoch: u64,
my_peer_id: PeerId,
request_num_peers: usize,
request_timeout_ms: usize,
- digest_to_state: HashMap,
- timeouts: DigestTimeouts,
network_sender: T,
}
-impl BatchRequester {
+impl BatchRequester {
pub(crate) fn new(
epoch: u64,
my_peer_id: PeerId,
@@ -102,71 +100,46 @@ impl BatchRequester {
my_peer_id,
request_num_peers,
request_timeout_ms,
- digest_to_state: HashMap::new(),
- timeouts: DigestTimeouts::new(),
network_sender,
}
}
- async fn send_requests(&self, digest: HashValue, request_peers: Vec) {
- // Quorum Store measurements
- counters::SENT_BATCH_REQUEST_COUNT.inc();
- let request = BatchRequest::new(self.my_peer_id, self.epoch, digest);
- self.network_sender
- .send_batch_request(request, request_peers)
- .await;
- }
-
- pub(crate) async fn add_request(
- &mut self,
+ pub(crate) fn request_batch(
+ &self,
digest: HashValue,
signers: Vec,
- ret_tx: oneshot::Sender, aptos_executor_types::Error>>,
+ ret_tx: oneshot::Sender, Error>>,
) {
let mut request_state = BatchRequesterState::new(signers, ret_tx);
- let request_peers = request_state
- .next_request_peers(self.request_num_peers)
- .unwrap(); // note: this is the first try
-
- debug!("QS: requesting from {:?}", request_peers);
+ let network_sender = self.network_sender.clone();
+ let request_num_peers = self.request_num_peers;
+ let my_peer_id = self.my_peer_id;
+ let epoch = self.epoch;
+ let timeout = Duration::from_millis(self.request_timeout_ms as u64);
- self.digest_to_state.insert(digest, request_state);
- self.send_requests(digest, request_peers).await;
- self.timeouts.add_digest(digest, self.request_timeout_ms);
- }
-
- pub(crate) async fn handle_timeouts(&mut self) {
- for digest in self.timeouts.expire() {
- debug!("QS: timed out batch request, digest = {}", digest);
- if let Some(state) = self.digest_to_state.get_mut(&digest) {
- if let Some(request_peers) = state.next_request_peers(self.request_num_peers) {
- // Quorum Store measurements
- counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc();
- self.send_requests(digest, request_peers).await;
- self.timeouts.add_digest(digest, self.request_timeout_ms);
- } else {
- let state = self.digest_to_state.remove(&digest).unwrap();
- state.serve_request(digest, None);
+ tokio::spawn(async move {
+ while let Some(request_peers) = request_state.next_request_peers(request_num_peers) {
+ let mut futures = FuturesUnordered::new();
+ debug!("QS: requesting from {:?}", request_peers);
+ let request = BatchRequest::new(my_peer_id, epoch, digest);
+ for peer in request_peers {
+ counters::SENT_BATCH_REQUEST_COUNT.inc();
+ futures.push(network_sender.request_batch(request.clone(), peer, timeout));
}
+ while let Some(response) = futures.next().await {
+ if let Ok(batch) = response {
+ counters::RECEIVED_BATCH_COUNT.inc();
+ if batch.verify().is_ok() {
+ let digest = batch.digest();
+ let payload = batch.into_payload();
+ request_state.serve_request(digest, Some(payload));
+ return;
+ }
+ }
+ }
+ counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc();
}
- }
- }
-
- pub(crate) fn serve_request(&mut self, digest: HashValue, payload: Vec) {
- if self.digest_to_state.contains_key(&digest) {
- let mut hasher = DefaultHasher::new(b"QuorumStoreBatch");
- let serialized_payload: Vec = payload
- .iter()
- .flat_map(|txn| to_bytes(txn).unwrap())
- .collect();
- hasher.update(&serialized_payload);
- if hasher.finish() == digest {
- debug!("QS: serving batch digest = {}", digest);
- let state = self.digest_to_state.remove(&digest).unwrap();
- state.serve_request(digest, Some(payload));
- } else {
- debug!("Payload does not fit digest")
- }
- }
+ request_state.serve_request(digest, None);
+ });
}
}
diff --git a/consensus/src/quorum_store/batch_store.rs b/consensus/src/quorum_store/batch_store.rs
index c9a9967c9dd02..560a7b5a0be54 100644
--- a/consensus/src/quorum_store/batch_store.rs
+++ b/consensus/src/quorum_store/batch_store.rs
@@ -1,38 +1,16 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0
-use crate::{
- network::QuorumStoreSender,
- quorum_store::{
- batch_reader::{BatchReader, BatchReaderCommand},
- counters,
- proof_coordinator::ProofCoordinatorCommand,
- quorum_store_db::QuorumStoreDB,
- types::{Batch, PersistedValue},
- },
-};
-// use aptos_logger::spawn_named;
-use aptos_consensus_types::{
- common::Round,
- proof_of_store::{LogicalTime, SignedDigest},
-};
+use crate::quorum_store::types::PersistedValue;
+use aptos_consensus_types::proof_of_store::LogicalTime;
use aptos_crypto::HashValue;
-use aptos_logger::debug;
-use aptos_types::{
- transaction::SignedTransaction, validator_signer::ValidatorSigner,
- validator_verifier::ValidatorVerifier, PeerId,
-};
+use aptos_types::{transaction::SignedTransaction, PeerId};
use serde::{Deserialize, Serialize};
-use std::sync::Arc;
-use tokio::sync::{
- mpsc::{Receiver, Sender},
- oneshot,
-};
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct PersistRequest {
- digest: HashValue,
- value: PersistedValue,
+ pub digest: HashValue,
+ pub value: PersistedValue,
}
impl PersistRequest {
@@ -49,233 +27,3 @@ impl PersistRequest {
}
}
}
-
-#[derive(Debug)]
-pub(crate) enum BatchStoreCommand {
- Persist(PersistRequest),
- BatchRequest(
- HashValue,
- PeerId,
- Option, aptos_executor_types::Error>>>,
- ),
- Clean(Vec),
- Shutdown(oneshot::Sender<()>),
-}
-
-pub(crate) struct BatchStore {
- epoch: u64,
- my_peer_id: PeerId,
- network_sender: T,
- batch_reader: Arc,
- db: Arc,
- validator_signer: Arc,
-}
-
-// TODO: send config to reduce number of arguments?
-#[allow(clippy::too_many_arguments)]
-impl BatchStore {
- pub fn new(
- epoch: u64,
- last_committed_round: Round,
- my_peer_id: PeerId,
- network_sender: T,
- batch_store_tx: Sender,
- batch_reader_tx: Sender,
- batch_reader_rx: Receiver,
- db: Arc,
- validator_verifier: ValidatorVerifier,
- validator_signer: Arc,
- batch_expiry_round_gap_when_init: Round,
- batch_expiry_round_gap_behind_latest_certified: Round,
- batch_expiry_round_gap_beyond_latest_certified: Round,
- batch_expiry_grace_rounds: Round,
- batch_request_num_peers: usize,
- batch_request_timeout_ms: usize,
- memory_quota: usize,
- db_quota: usize,
- ) -> (Self, Arc) {
- let db_content = db.get_all_batches().expect("failed to read data from db");
- debug!("QS: db size {}", db_content.len());
-
- let (batch_reader, expired_keys) = BatchReader::new(
- epoch,
- last_committed_round,
- db_content,
- my_peer_id,
- batch_store_tx,
- batch_reader_tx,
- batch_expiry_round_gap_when_init,
- batch_expiry_round_gap_behind_latest_certified,
- batch_expiry_round_gap_beyond_latest_certified,
- batch_expiry_grace_rounds,
- memory_quota,
- db_quota,
- );
- if let Err(e) = db.delete_batches(expired_keys) {
- debug!("Error deleting batches: {:?}", e)
- }
- let batch_reader_clone = batch_reader.clone();
- let net = network_sender.clone();
- let metrics_monitor = tokio_metrics::TaskMonitor::new();
-
- tokio::spawn(async move {
- metrics_monitor
- .instrument(batch_reader_clone.start(
- batch_reader_rx,
- net,
- batch_request_num_peers,
- batch_request_timeout_ms,
- validator_verifier,
- ))
- .await
- });
-
- let batch_reader_clone = batch_reader.clone();
- (
- Self {
- epoch,
- my_peer_id,
- network_sender,
- batch_reader,
- db,
- validator_signer,
- },
- batch_reader_clone,
- )
- }
-
- fn store(&self, persist_request: PersistRequest) -> Option {
- debug!("QS: store");
- let expiration = persist_request.value.expiration;
- // Network listener should filter messages with wrong expiration epoch.
- assert_eq!(
- expiration.epoch(),
- self.epoch,
- "Persist Request for a batch with an incorrect epoch"
- );
- match self
- .batch_reader
- .save(persist_request.digest, persist_request.value.clone()) // TODO: what is this comes from old epoch?
- {
- Ok(needs_db) => {
- let batch_author = persist_request.value.author;
- let num_txns = persist_request.value.maybe_payload.as_ref().unwrap().len() as u64;
- let num_bytes = persist_request.value.num_bytes as u64;
- debug!("QS: sign digest");
- if needs_db {
- // TODO: Consider an async call to DB, but it could be a race with clean.
- self.db
- .save_batch(persist_request.digest, persist_request.value)
- .expect("Could not write to DB");
- }
- Some(SignedDigest::new(
- batch_author,
- self.epoch,
- persist_request.digest,
- expiration,
- num_txns,
- num_bytes,
- self.validator_signer.clone(),
- ).unwrap())
- }
-
- Err(e) => {
- debug!("QS: failed to store to cache {:?}", e);
- None
- }
- }
- }
-
- pub async fn start(
- self,
- mut batch_store_rx: Receiver,
- proof_coordinator_tx: Sender,
- ) {
- debug!(
- "[QS worker] BatchStore worker for epoch {} starting",
- self.epoch
- );
-
- while let Some(command) = batch_store_rx.recv().await {
- match command {
- BatchStoreCommand::Shutdown(ack_tx) => {
- self.batch_reader.shutdown().await;
- ack_tx
- .send(())
- .expect("Failed to send shutdown ack to QuorumStore");
- break;
- },
- BatchStoreCommand::Persist(persist_request) => {
- let author = persist_request.value.author;
- if let Some(signed_digest) = self.store(persist_request) {
- if self.my_peer_id == author {
- proof_coordinator_tx
- .send(ProofCoordinatorCommand::AppendSignature(signed_digest))
- .await
- .expect("Failed to send to ProofBuilder");
- debug!("QS: sent signed digest to ProofBuilder");
- } else {
- // Quorum store metrics
- counters::DELIVERED_BATCHES_COUNT.inc();
-
- self.network_sender
- .send_signed_digest(signed_digest, vec![author])
- .await;
- debug!("QS: sent signed digest back to sender");
- }
- }
- },
- BatchStoreCommand::Clean(digests) => {
- if let Err(e) = self.db.delete_batches(digests) {
- debug!("Error deleting batches: {:?}", e)
- }
- },
- BatchStoreCommand::BatchRequest(digest, peer_id, maybe_tx) => {
- counters::GET_BATCH_FROM_DB_COUNT.inc();
-
- match self.db.get_batch(digest) {
- Ok(Some(persisted_value)) => {
- let payload = persisted_value
- .maybe_payload
- .expect("Persisted value in QuorumStore DB must have payload");
- match maybe_tx {
- Some(payload_tx) => {
- assert_eq!(
- self.my_peer_id, peer_id,
- "Return channel must be to self"
- );
- if payload_tx.send(Ok(payload)).is_err() {
- debug!(
- "Failed to send PersistedValue for digest {}",
- digest
- );
- }
- },
- None => {
- assert_ne!(
- self.my_peer_id, peer_id,
- "Request from self without return channel"
- );
- let batch =
- Batch::new(self.my_peer_id, self.epoch, digest, payload);
- self.network_sender.send_batch(batch, vec![peer_id]).await;
- },
- }
- },
- Ok(None) => unreachable!(
- "Could not read persisted value (according to BatchReader) from DB"
- ),
- Err(_) => {
- // TODO: handle error, e.g. from self or not, log, panic.
- },
- }
- },
- }
- }
-
- debug!(
- "[QS worker] BatchStore worker for epoch {} stopping",
- self.epoch
- );
- }
-}
diff --git a/consensus/src/quorum_store/network_listener.rs b/consensus/src/quorum_store/network_listener.rs
index 9144db43d9252..8f9b7a385cb44 100644
--- a/consensus/src/quorum_store/network_listener.rs
+++ b/consensus/src/quorum_store/network_listener.rs
@@ -3,7 +3,7 @@
use crate::{
quorum_store::{
- batch_coordinator::BatchCoordinatorCommand, batch_reader::BatchReaderCommand, counters,
+ batch_coordinator::BatchCoordinatorCommand, counters,
proof_coordinator::ProofCoordinatorCommand, proof_manager::ProofManagerCommand,
},
round_manager::VerifiedEvent,
@@ -16,7 +16,6 @@ use tokio::sync::mpsc::Sender;
pub(crate) struct NetworkListener {
network_msg_rx: aptos_channel::Receiver,
- batch_reader_tx: Sender,
proof_coordinator_tx: Sender,
remote_batch_coordinator_tx: Vec>,
proof_manager_tx: Sender,
@@ -25,14 +24,12 @@ pub(crate) struct NetworkListener {
impl NetworkListener {
pub(crate) fn new(
network_msg_rx: aptos_channel::Receiver,
- batch_reader_tx: Sender,
proof_coordinator_tx: Sender,
remote_batch_coordinator_tx: Vec>,
proof_manager_tx: Sender,
) -> Self {
Self {
network_msg_rx,
- batch_reader_tx,
proof_coordinator_tx,
remote_batch_coordinator_tx,
proof_manager_tx,
@@ -78,34 +75,6 @@ impl NetworkListener {
.await
.expect("Could not send remote fragment");
},
- VerifiedEvent::BatchRequestMsg(request) => {
- counters::RECEIVED_BATCH_REQUEST_COUNT.inc();
- debug!(
- "QS: batch request from {:?} digest {}",
- request.source(),
- request.digest()
- );
- let cmd =
- BatchReaderCommand::GetBatchForPeer(request.digest(), request.source());
- self.batch_reader_tx
- .send(cmd)
- .await
- .expect("could not push Batch batch_reader");
- },
- VerifiedEvent::UnverifiedBatchMsg(batch) => {
- counters::RECEIVED_BATCH_COUNT.inc();
- debug!(
- "QS: batch response from {:?} digest {}",
- batch.source(),
- batch.digest()
- );
- let cmd =
- BatchReaderCommand::BatchResponse(batch.digest(), batch.into_payload());
- self.batch_reader_tx
- .send(cmd)
- .await
- .expect("could not push Batch batch_reader");
- },
VerifiedEvent::ProofOfStoreMsg(proof) => {
counters::REMOTE_POS_COUNT.inc();
let cmd = ProofManagerCommand::RemoteProof(*proof);
diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs
index 73c2cdc284457..c8443e4bd90f2 100644
--- a/consensus/src/quorum_store/quorum_store_builder.rs
+++ b/consensus/src/quorum_store/quorum_store_builder.rs
@@ -2,19 +2,22 @@
// SPDX-License-Identifier: Apache-2.0
use crate::{
- network::NetworkSender,
+ network::{IncomingBatchRetrievalRequest, NetworkSender},
+ network_interface::ConsensusMsg,
payload_manager::PayloadManager,
quorum_store::{
batch_coordinator::{BatchCoordinator, BatchCoordinatorCommand},
batch_generator::{BatchGenerator, BatchGeneratorCommand},
- batch_reader::{BatchReader, BatchReaderCommand},
- batch_store::{BatchStore, BatchStoreCommand},
+ batch_reader::BatchReader,
+ batch_requester::BatchRequester,
+ counters,
direct_mempool_quorum_store::DirectMempoolQuorumStore,
network_listener::NetworkListener,
proof_coordinator::{ProofCoordinator, ProofCoordinatorCommand},
proof_manager::{ProofManager, ProofManagerCommand},
quorum_store_coordinator::{CoordinatorCommand, QuorumStoreCoordinator},
quorum_store_db::QuorumStoreDB,
+ types::Batch,
},
round_manager::VerifiedEvent,
};
@@ -31,8 +34,9 @@ use aptos_types::{
account_address::AccountAddress, validator_signer::ValidatorSigner,
validator_verifier::ValidatorVerifier,
};
+use futures::StreamExt;
use futures_channel::mpsc::{Receiver, Sender};
-use std::{path::PathBuf, sync::Arc, time::Duration};
+use std::{sync::Arc, time::Duration};
pub enum QuorumStoreBuilder {
DirectMempool(DirectMempoolInnerBuilder),
@@ -52,13 +56,16 @@ impl QuorumStoreBuilder {
}
}
- pub fn start(self) -> Option> {
+ pub fn start(
+ self,
+ rx: aptos_channel::Receiver,
+ ) -> Option> {
match self {
QuorumStoreBuilder::DirectMempool(inner) => {
inner.start();
None
},
- QuorumStoreBuilder::QuorumStore(inner) => Some(inner.start()),
+ QuorumStoreBuilder::QuorumStore(inner) => Some(inner.start(rx)),
}
}
}
@@ -123,18 +130,14 @@ pub struct InnerBuilder {
proof_coordinator_cmd_rx: Option>,
proof_manager_cmd_tx: tokio::sync::mpsc::Sender,
proof_manager_cmd_rx: Option>,
- batch_store_cmd_tx: tokio::sync::mpsc::Sender,
- batch_store_cmd_rx: Option>,
- batch_reader_cmd_tx: tokio::sync::mpsc::Sender,
- batch_reader_cmd_rx: Option>,
back_pressure_tx: tokio::sync::mpsc::Sender,
back_pressure_rx: Option>,
- quorum_store_storage_path: PathBuf,
- quorum_store_storage: Option>,
+ quorum_store_storage: Arc,
quorum_store_msg_tx: aptos_channel::Sender,
quorum_store_msg_rx: Option>,
remote_batch_coordinator_cmd_tx: Vec>,
remote_batch_coordinator_cmd_rx: Vec>,
+ batch_reader: Option>>,
}
impl InnerBuilder {
@@ -149,7 +152,7 @@ impl InnerBuilder {
network_sender: NetworkSender,
verifier: ValidatorVerifier,
backend: SecureBackend,
- quorum_store_storage_path: PathBuf,
+ quorum_store_storage: Arc,
) -> Self {
let (coordinator_tx, coordinator_rx) = futures_channel::mpsc::channel(config.channel_size);
let (batch_generator_cmd_tx, batch_generator_cmd_rx) =
@@ -160,10 +163,6 @@ impl InnerBuilder {
tokio::sync::mpsc::channel(config.channel_size);
let (proof_manager_cmd_tx, proof_manager_cmd_rx) =
tokio::sync::mpsc::channel(config.channel_size);
- let (batch_store_cmd_tx, batch_store_cmd_rx) =
- tokio::sync::mpsc::channel(config.channel_size);
- let (batch_reader_cmd_tx, batch_reader_cmd_rx) =
- tokio::sync::mpsc::channel(config.channel_size);
let (back_pressure_tx, back_pressure_rx) = tokio::sync::mpsc::channel(config.channel_size);
let (quorum_store_msg_tx, quorum_store_msg_rx) =
aptos_channel::new::(
@@ -201,22 +200,18 @@ impl InnerBuilder {
proof_coordinator_cmd_rx: Some(proof_coordinator_cmd_rx),
proof_manager_cmd_tx,
proof_manager_cmd_rx: Some(proof_manager_cmd_rx),
- batch_store_cmd_tx,
- batch_store_cmd_rx: Some(batch_store_cmd_rx),
- batch_reader_cmd_tx,
- batch_reader_cmd_rx: Some(batch_reader_cmd_rx),
back_pressure_tx,
back_pressure_rx: Some(back_pressure_rx),
- quorum_store_storage_path,
- quorum_store_storage: None,
+ quorum_store_storage,
quorum_store_msg_tx,
quorum_store_msg_rx: Some(quorum_store_msg_rx),
remote_batch_coordinator_cmd_tx,
remote_batch_coordinator_cmd_rx,
+ batch_reader: None,
}
}
- fn spawn_quorum_store(&mut self) -> Arc {
+ fn create_batch_store(&mut self) -> Arc> {
let backend = &self.backend;
let storage: Storage = backend.try_into().expect("Unable to initialize storage");
if let Err(error) = storage.available() {
@@ -246,38 +241,37 @@ impl InnerBuilder {
0
};
- let batch_store_cmd_rx = self.batch_store_cmd_rx.take().unwrap();
- let batch_reader_cmd_rx = self.batch_reader_cmd_rx.take().unwrap();
- let (batch_store, batch_reader) = BatchStore::new(
+ let batch_requester = BatchRequester::new(
self.epoch,
- last_committed_round,
self.author,
+ self.config.batch_request_num_peers,
+ self.config.batch_request_timeout_ms,
self.network_sender.clone(),
- self.batch_store_cmd_tx.clone(),
- self.batch_reader_cmd_tx.clone(),
- batch_reader_cmd_rx,
- self.quorum_store_storage.as_ref().unwrap().clone(),
- self.verifier.clone(),
- Arc::new(signer),
+ );
+ let batch_reader = Arc::new(BatchReader::new(
+ self.epoch,
+ last_committed_round,
+ self.quorum_store_storage.clone(),
self.config.batch_expiry_round_gap_when_init,
self.config.batch_expiry_round_gap_behind_latest_certified,
self.config.batch_expiry_round_gap_beyond_latest_certified,
self.config.batch_expiry_grace_rounds,
- self.config.batch_request_num_peers,
- self.config.batch_request_timeout_ms,
self.config.memory_quota,
self.config.db_quota,
- );
- spawn_named!(
- "batch_store",
- batch_store.start(batch_store_cmd_rx, self.proof_coordinator_cmd_tx.clone())
- );
+ batch_requester,
+ signer,
+ self.verifier.clone(),
+ ));
+ self.batch_reader = Some(batch_reader.clone());
batch_reader
}
- fn spawn_quorum_store_wrapper(mut self) -> Sender {
- let quorum_store_storage = self.quorum_store_storage.as_ref().unwrap().clone();
+ fn spawn_quorum_store(
+ mut self,
+ mut rx: aptos_channel::Receiver,
+ ) -> Sender {
+ let quorum_store_storage = self.quorum_store_storage.clone();
// TODO: parameter? bring back back-off?
let interval = tokio::time::interval(Duration::from_millis(
@@ -292,7 +286,6 @@ impl InnerBuilder {
self.remote_batch_coordinator_cmd_tx.clone(),
self.proof_coordinator_cmd_tx.clone(),
self.proof_manager_cmd_tx.clone(),
- self.batch_store_cmd_tx.clone(),
self.quorum_store_msg_tx.clone(),
);
spawn_named!(
@@ -321,7 +314,7 @@ impl InnerBuilder {
self.author,
self.network_sender.clone(),
batch_coordinator_cmd_rx,
- self.batch_store_cmd_tx.clone(),
+ self.batch_reader.clone().unwrap(),
self.proof_coordinator_cmd_tx.clone(),
self.config.max_batch_bytes,
);
@@ -335,7 +328,7 @@ impl InnerBuilder {
self.author,
self.network_sender.clone(),
remote_batch_coordinator_cmd_rx,
- self.batch_store_cmd_tx.clone(),
+ self.batch_reader.clone().unwrap(),
self.proof_coordinator_cmd_tx.clone(),
self.config.max_batch_bytes,
);
@@ -373,13 +366,27 @@ impl InnerBuilder {
let network_msg_rx = self.quorum_store_msg_rx.take().unwrap();
let net = NetworkListener::new(
network_msg_rx,
- self.batch_reader_cmd_tx.clone(),
self.proof_coordinator_cmd_tx.clone(),
self.remote_batch_coordinator_cmd_tx.clone(),
self.proof_manager_cmd_tx.clone(),
);
spawn_named!("network_listener", net.start());
+ let batch_reader = self.batch_reader.clone().unwrap();
+ let author = self.author;
+ let epoch = self.epoch;
+ spawn_named!("batch_serve", async move {
+ while let Some(rpc_request) = rx.next().await {
+ counters::RECEIVED_BATCH_REQUEST_COUNT.inc();
+ if let Ok(value) = batch_reader.get_batch_from_local(&rpc_request.req.digest()) {
+ let batch = Batch::new(author, epoch, rpc_request.req.digest(), value);
+ let msg = ConsensusMsg::BatchMsg(Box::new(batch));
+ let bytes = rpc_request.protocol.to_bytes(&msg).unwrap();
+ let _ = rpc_request.response_sender.send(Ok(bytes.into()));
+ }
+ }
+ });
+
self.coordinator_tx
}
@@ -389,11 +396,7 @@ impl InnerBuilder {
Arc,
Option>,
) {
- self.quorum_store_storage = Some(Arc::new(QuorumStoreDB::new(
- self.quorum_store_storage_path.clone(),
- )));
-
- let batch_reader = self.spawn_quorum_store();
+ let batch_reader = self.create_batch_store();
(
Arc::from(PayloadManager::InQuorumStore(
@@ -405,7 +408,10 @@ impl InnerBuilder {
)
}
- fn start(self) -> Sender {
- self.spawn_quorum_store_wrapper()
+ fn start(
+ self,
+ rx: aptos_channel::Receiver,
+ ) -> Sender {
+ self.spawn_quorum_store(rx)
}
}
diff --git a/consensus/src/quorum_store/quorum_store_coordinator.rs b/consensus/src/quorum_store/quorum_store_coordinator.rs
index ac64a6bca8da3..d829e8b9b6f3f 100644
--- a/consensus/src/quorum_store/quorum_store_coordinator.rs
+++ b/consensus/src/quorum_store/quorum_store_coordinator.rs
@@ -4,8 +4,7 @@
use crate::{
quorum_store::{
batch_coordinator::BatchCoordinatorCommand, batch_generator::BatchGeneratorCommand,
- batch_store::BatchStoreCommand, proof_coordinator::ProofCoordinatorCommand,
- proof_manager::ProofManagerCommand,
+ proof_coordinator::ProofCoordinatorCommand, proof_manager::ProofManagerCommand,
},
round_manager::VerifiedEvent,
};
@@ -29,7 +28,6 @@ pub struct QuorumStoreCoordinator {
remote_batch_coordinator_cmd_tx: Vec>,
proof_coordinator_cmd_tx: mpsc::Sender,
proof_manager_cmd_tx: mpsc::Sender,
- batch_store_cmd_tx: mpsc::Sender,
quorum_store_msg_tx: aptos_channel::Sender,
}
@@ -41,7 +39,6 @@ impl QuorumStoreCoordinator {
remote_batch_coordinator_cmd_tx: Vec>,
proof_coordinator_cmd_tx: mpsc::Sender,
proof_manager_cmd_tx: mpsc::Sender,
- batch_store_cmd_tx: mpsc::Sender,
quorum_store_msg_tx: aptos_channel::Sender,
) -> Self {
Self {
@@ -51,7 +48,6 @@ impl QuorumStoreCoordinator {
remote_batch_coordinator_cmd_tx,
proof_coordinator_cmd_tx,
proof_manager_cmd_tx,
- batch_store_cmd_tx,
quorum_store_msg_tx,
}
}
@@ -133,15 +129,6 @@ impl QuorumStoreCoordinator {
.expect("Failed to stop Remote BatchCoordinator");
}
- let (batch_store_shutdown_tx, batch_store_shutdown_rx) = oneshot::channel();
- self.batch_store_cmd_tx
- .send(BatchStoreCommand::Shutdown(batch_store_shutdown_tx))
- .await
- .expect("Failed to send to BatchStore");
- batch_store_shutdown_rx
- .await
- .expect("Failed to stop BatchStore");
-
let (proof_coordinator_shutdown_tx, proof_coordinator_shutdown_rx) =
oneshot::channel();
self.proof_coordinator_cmd_tx
diff --git a/consensus/src/quorum_store/quorum_store_db.rs b/consensus/src/quorum_store/quorum_store_db.rs
index f64a1372d98f4..6c7673a3f6f02 100644
--- a/consensus/src/quorum_store/quorum_store_db.rs
+++ b/consensus/src/quorum_store/quorum_store_db.rs
@@ -76,8 +76,8 @@ impl QuorumStoreDB {
Ok(self.db.put::(&digest, &batch)?)
}
- pub(crate) fn get_batch(&self, digest: HashValue) -> Result