Skip to content

Commit

Permalink
[quorum store] refactor batch_reader/store and batch requester
Browse files Browse the repository at this point in the history
This commit merges batch_reader and batch_store into a single struct and uses it as synchronous component (via function calls)
instead of asynchronous one (via channels).

Also batch_requester is refactored to use network rpc directly.
  • Loading branch information
zekun000 committed Mar 7, 2023
1 parent 59f287d commit 52740b0
Show file tree
Hide file tree
Showing 25 changed files with 447 additions and 982 deletions.
3 changes: 1 addition & 2 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use aptos_types::{
};
use rand::{seq::SliceRandom, thread_rng};
use serde::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub struct LogicalTime {
Expand Down Expand Up @@ -78,7 +77,7 @@ impl SignedDigest {
expiration: LogicalTime,
num_txns: u64,
num_bytes: u64,
validator_signer: Arc<ValidatorSigner>,
validator_signer: &ValidatorSigner,
) -> Result<Self, CryptoMaterialError> {
let info = SignedDigestInfo::new(batch_author, digest, expiration, num_txns, num_bytes);
let signature = validator_signer.sign(&info)?;
Expand Down
47 changes: 35 additions & 12 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ use crate::{
logging::{LogEvent, LogSchema},
metrics_safety_rules::MetricsSafetyRules,
monitor,
network::{IncomingBlockRetrievalRequest, NetworkReceivers, NetworkSender},
network::{
IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest, NetworkReceivers,
NetworkSender,
},
network_interface::{ConsensusMsg, ConsensusNetworkClient},
payload_client::QuorumStoreClient,
persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
quorum_store::{
quorum_store_builder::{DirectMempoolInnerBuilder, InnerBuilder, QuorumStoreBuilder},
quorum_store_coordinator::CoordinatorCommand,
quorum_store_db::QuorumStoreDB,
},
recovery_manager::RecoveryManager,
round_manager::{RoundManager, UnverifiedEvent, VerifiedEvent},
Expand Down Expand Up @@ -80,7 +84,6 @@ use std::{
cmp::Ordering,
collections::HashMap,
mem::{discriminant, Discriminant},
path::PathBuf,
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -124,9 +127,11 @@ pub struct EpochManager {
epoch_state: Option<EpochState>,
block_retrieval_tx:
Option<aptos_channel::Sender<AccountAddress, IncomingBlockRetrievalRequest>>,
quorum_store_storage_path: PathBuf,
quorum_store_msg_tx: Option<aptos_channel::Sender<AccountAddress, VerifiedEvent>>,
quorum_store_coordinator_tx: Option<Sender<CoordinatorCommand>>,
quorum_store_db: Arc<QuorumStoreDB>,
batch_retrieval_tx:
Option<aptos_channel::Sender<AccountAddress, IncomingBatchRetrievalRequest>>,
}

impl EpochManager {
Expand All @@ -145,6 +150,7 @@ impl EpochManager {
let config = node_config.consensus.clone();
let sr_config = &node_config.consensus.safety_rules;
let safety_rules_manager = SafetyRulesManager::new(sr_config);
let quorum_store_db = Arc::new(QuorumStoreDB::new(node_config.storage.dir()));
Self {
author,
config,
Expand All @@ -165,9 +171,10 @@ impl EpochManager {
round_manager_close_tx: None,
epoch_state: None,
block_retrieval_tx: None,
quorum_store_storage_path: node_config.storage.dir(),
quorum_store_msg_tx: None,
quorum_store_coordinator_tx: None,
quorum_store_db,
batch_retrieval_tx: None,
}
}

Expand Down Expand Up @@ -538,6 +545,7 @@ impl EpochManager {

// Shutdown the block retrieval task by dropping the sender
self.block_retrieval_tx = None;
self.batch_retrieval_tx = None;

if let Some(mut quorum_store_coordinator_tx) = self.quorum_store_coordinator_tx.take() {
let (ack_tx, ack_rx) = oneshot::channel();
Expand Down Expand Up @@ -644,7 +652,7 @@ impl EpochManager {
network_sender.clone(),
epoch_state.verifier.clone(),
self.config.safety_rules.backend.clone(),
self.quorum_store_storage_path.clone(),
self.quorum_store_db.clone(),
))
} else {
info!("Building DirectMempool");
Expand Down Expand Up @@ -689,8 +697,10 @@ impl EpochManager {
onchain_consensus_config.back_pressure_limit(),
payload_manager.clone(),
));
let (tx, rx) = aptos_channel::new(QueueStyle::LIFO, 1, None);
self.batch_retrieval_tx = Some(tx);

self.quorum_store_coordinator_tx = quorum_store_builder.start();
self.quorum_store_coordinator_tx = quorum_store_builder.start(rx);

info!(epoch = epoch, "Create ProposalGenerator");
// txn manager is required both by proposal generator (to pull the proposers)
Expand Down Expand Up @@ -909,8 +919,6 @@ impl EpochManager {
) -> anyhow::Result<()> {
match event {
UnverifiedEvent::FragmentMsg(_)
| UnverifiedEvent::BatchRequestMsg(_)
| UnverifiedEvent::BatchMsg(_)
| UnverifiedEvent::SignedDigestMsg(_)
| UnverifiedEvent::ProofOfStoreMsg(_) => {
if self.quorum_store_enabled {
Expand Down Expand Up @@ -938,9 +946,7 @@ impl EpochManager {
);
}
match event {
quorum_store_event @ (VerifiedEvent::BatchRequestMsg(_)
| VerifiedEvent::UnverifiedBatchMsg(_)
| VerifiedEvent::SignedDigestMsg(_)
quorum_store_event @ (VerifiedEvent::SignedDigestMsg(_)
| VerifiedEvent::ProofOfStoreMsg(_)
| VerifiedEvent::FragmentMsg(_)) => {
if let Some(sender) = &mut self.quorum_store_msg_tx {
Expand Down Expand Up @@ -987,6 +993,18 @@ impl EpochManager {
}
}

fn process_batch_retrieval(
&self,
peer_id: Author,
request: IncomingBatchRetrievalRequest,
) -> anyhow::Result<()> {
if let Some(tx) = &self.batch_retrieval_tx {
tx.push(peer_id, request)
} else {
Err(anyhow::anyhow!("Quorum store not started"))
}
}

fn process_local_timeout(&mut self, round: u64) {
self.forward_to_round_manager(self.author, VerifiedEvent::LocalTimeout(round));
}
Expand Down Expand Up @@ -1020,11 +1038,16 @@ impl EpochManager {
error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e));
}
},
(peer, request) = network_receivers.block_retrieval.select_next_some() => {
(peer, request) = network_receivers.block_retrieval_rx.select_next_some() => {
if let Err(e) = self.process_block_retrieval(peer, request) {
error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e));
}
},
(peer, request) = network_receivers.batch_retrieval_rx.select_next_some() => {
if let Err(e) = self.process_batch_retrieval(peer, request) {
error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e));
}
},
round = round_timeout_sender_rx.select_next_some() => {
self.process_local_timeout(round);
},
Expand Down
62 changes: 56 additions & 6 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ pub struct IncomingBlockRetrievalRequest {
pub response_sender: oneshot::Sender<Result<Bytes, RpcError>>,
}

#[derive(Debug)]
pub struct IncomingBatchRetrievalRequest {
pub req: BatchRequest,
pub protocol: ProtocolId,
pub response_sender: oneshot::Sender<Result<Bytes, RpcError>>,
}

/// Just a convenience struct to keep all the network proxy receiving queues in one place.
/// Will be returned by the NetworkTask upon startup.
pub struct NetworkReceivers {
Expand All @@ -65,14 +72,23 @@ pub struct NetworkReceivers {
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
pub block_retrieval:
pub block_retrieval_rx:
aptos_channel::Receiver<AccountAddress, (AccountAddress, IncomingBlockRetrievalRequest)>,
pub batch_retrieval_rx:
aptos_channel::Receiver<AccountAddress, (AccountAddress, IncomingBatchRetrievalRequest)>,
}

#[async_trait::async_trait]
pub(crate) trait QuorumStoreSender {
pub trait QuorumStoreSender: Send + Clone {
async fn send_batch_request(&self, request: BatchRequest, recipients: Vec<Author>);

async fn request_batch(
&self,
request: BatchRequest,
recipient: Author,
timeout: Duration,
) -> anyhow::Result<Batch>;

async fn send_batch(&self, batch: Batch, recipients: Vec<Author>);

async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec<Author>);
Expand Down Expand Up @@ -311,6 +327,23 @@ impl QuorumStoreSender for NetworkSender {
self.send(msg, recipients).await
}

async fn request_batch(
&self,
request: BatchRequest,
recipient: Author,
timeout: Duration,
) -> anyhow::Result<Batch> {
let msg = ConsensusMsg::BatchRequestMsg(Box::new(request));
let response = self
.consensus_network_client
.send_rpc(recipient, msg, timeout)
.await?;
match response {
ConsensusMsg::BatchMsg(batch) => Ok(*batch),
_ => Err(anyhow!("Invalid batch response")),
}
}

async fn send_batch(&self, batch: Batch, recipients: Vec<Author>) {
fail_point!("consensus::send::batch", |_| ());
let msg = ConsensusMsg::BatchMsg(Box::new(batch));
Expand Down Expand Up @@ -347,6 +380,8 @@ pub struct NetworkTask {
>,
block_retrieval_tx:
aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingBlockRetrievalRequest)>,
batch_retrieval_tx:
aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingBatchRetrievalRequest)>,
all_events: Box<dyn Stream<Item = Event<ConsensusMsg>> + Send + Unpin>,
}

Expand All @@ -364,11 +399,13 @@ impl NetworkTask {
50,
Some(&counters::QUORUM_STORE_CHANNEL_MSGS),
);
let (block_retrieval_tx, block_retrieval) = aptos_channel::new(
let (block_retrieval_tx, block_retrieval_rx) = aptos_channel::new(
QueueStyle::LIFO,
1,
Some(&counters::BLOCK_RETRIEVAL_CHANNEL_MSGS),
);
let (batch_retrieval_tx, batch_retrieval_rx) =
aptos_channel::new(QueueStyle::LIFO, 1, None);

// Verify the network events have been constructed correctly
let network_and_events = network_service_events.into_network_and_events();
Expand All @@ -388,12 +425,14 @@ impl NetworkTask {
consensus_messages_tx,
quorum_store_messages_tx,
block_retrieval_tx,
batch_retrieval_tx,
all_events,
},
NetworkReceivers {
consensus_messages,
quorum_store_messages,
block_retrieval,
block_retrieval_rx,
batch_retrieval_rx,
},
)
}
Expand Down Expand Up @@ -422,10 +461,11 @@ impl NetworkTask {
.with_label_values(&[msg.name()])
.inc();
match msg {
ConsensusMsg::BatchRequestMsg(_) | ConsensusMsg::BatchMsg(_) => {
warn!("unexpected msg");
},
quorum_store_msg @ (ConsensusMsg::SignedDigestMsg(_)
| ConsensusMsg::FragmentMsg(_)
| ConsensusMsg::BatchRequestMsg(_)
| ConsensusMsg::BatchMsg(_)
| ConsensusMsg::ProofOfStoreMsg(_)) => {
Self::push_msg(
peer_id,
Expand Down Expand Up @@ -475,6 +515,16 @@ impl NetworkTask {
warn!(error = ?e, "aptos channel closed");
}
},
ConsensusMsg::BatchRequestMsg(request) => {
let req_with_callback = IncomingBatchRetrievalRequest {
req: *request,
protocol,
response_sender: callback,
};
let _ = self
.batch_retrieval_tx
.push(peer_id, (peer_id, req_with_callback));
},
_ => {
warn!(remote_peer = peer_id, "Unexpected msg: {:?}", msg);
continue;
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ mod tests {
);

// verify request block rpc
let mut block_retrieval = receiver_1.block_retrieval;
let mut block_retrieval = receiver_1.block_retrieval_rx;
let on_request_block = async move {
while let Some((_, request)) = block_retrieval.next().await {
// make sure the network task is not blocked during RPC
Expand Down Expand Up @@ -824,13 +824,13 @@ mod tests {
.unwrap();

let f_check = async move {
assert!(network_receivers.block_retrieval.next().await.is_some());
assert!(network_receivers.block_retrieval_rx.next().await.is_some());

drop(peer_mgr_notifs_tx);
drop(connection_notifs_tx);
drop(self_sender);

assert!(network_receivers.block_retrieval.next().await.is_none());
assert!(network_receivers.block_retrieval_rx.next().await.is_none());
assert!(network_receivers.consensus_messages.next().await.is_none());
};
let f_network_task = network_task.start();
Expand Down
14 changes: 9 additions & 5 deletions consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::quorum_store::{
batch_reader::BatchReader, quorum_store_coordinator::CoordinatorCommand,
use crate::{
network::NetworkSender,
quorum_store::{batch_reader::BatchReader, quorum_store_coordinator::CoordinatorCommand},
};
use aptos_consensus_types::{
block::Block,
Expand All @@ -22,14 +23,17 @@ use tokio::sync::oneshot;
/// If QuorumStore is enabled, has to ask BatchReader for the transaction behind the proofs of availability in the payload.
pub enum PayloadManager {
DirectMempool,
InQuorumStore(Arc<BatchReader>, Mutex<Sender<CoordinatorCommand>>),
InQuorumStore(
Arc<BatchReader<NetworkSender>>,
Mutex<Sender<CoordinatorCommand>>,
),
}

impl PayloadManager {
async fn request_transactions(
proofs: Vec<ProofOfStore>,
logical_time: LogicalTime,
batch_reader: &BatchReader,
batch_reader: &BatchReader<NetworkSender>,
) -> Vec<(
HashValue,
oneshot::Receiver<Result<Vec<SignedTransaction>, aptos_executor_types::Error>>,
Expand All @@ -43,7 +47,7 @@ impl PayloadManager {
logical_time
);
if logical_time <= pos.expiration() {
receivers.push((*pos.digest(), batch_reader.get_batch(pos).await));
receivers.push((*pos.digest(), batch_reader.get_batch(pos)));
} else {
debug!("QS: skipped expired pos");
}
Expand Down
Loading

0 comments on commit 52740b0

Please sign in to comment.