Skip to content

Commit

Permalink
[quorum store] refactor batch_reader/store and batch requester
Browse files Browse the repository at this point in the history
This commit merges batch_reader and batch_store into a single struct and uses it as synchronous component (via function calls)
instead of asynchronous one (via channels).

Also batch_requester is refactored to use network rpc directly.
  • Loading branch information
zekun000 committed Mar 8, 2023
1 parent e1a0237 commit f77d1d2
Show file tree
Hide file tree
Showing 29 changed files with 860 additions and 1,364 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/lint-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ jobs:
with:
tool: nextest
- run: cargo nextest run --locked --workspace --exclude smoke-test --exclude aptos-testcases --exclude aptos-api --exclude aptos-executor-benchmark --exclude aptos-backup-cli --retries 3 --no-fail-fast -F consensus-only-perf-test
env:
RUST_MIN_STACK: 4297152

rust-smoke-test:
runs-on: high-perf-docker
Expand Down
3 changes: 1 addition & 2 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use aptos_types::{
};
use rand::{seq::SliceRandom, thread_rng};
use serde::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub struct LogicalTime {
Expand Down Expand Up @@ -78,7 +77,7 @@ impl SignedDigest {
expiration: LogicalTime,
num_txns: u64,
num_bytes: u64,
validator_signer: Arc<ValidatorSigner>,
validator_signer: &ValidatorSigner,
) -> Result<Self, CryptoMaterialError> {
let info = SignedDigestInfo::new(batch_author, digest, expiration, num_txns, num_bytes);
let signature = validator_signer.sign(&info)?;
Expand Down
10 changes: 5 additions & 5 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,17 +634,17 @@ pub static QUORUM_STORE_CHANNEL_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Counters(queued,dequeued,dropped) related to block retrieval channel
pub static BLOCK_RETRIEVAL_CHANNEL_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
/// Counters(queued,dequeued,dropped) related to rpc request channel
pub static RPC_CHANNEL_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_consensus_block_retrieval_channel_msgs_count",
"Counters(queued,dequeued,dropped) related to block retrieval channel",
"aptos_consensus_rpc_channel_msgs_count",
"Counters(queued,dequeued,dropped) related to rpc request channel",
&["state"]
)
.unwrap()
});

/// Counters(queued,dequeued,dropped) related to block retrieval task
/// Counters(queued,dequeued,dropped) related to block retrieval per epoch task
pub static BLOCK_RETRIEVAL_TASK_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_consensus_block_retrieval_task_msgs_count",
Expand Down
51 changes: 35 additions & 16 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use crate::{
logging::{LogEvent, LogSchema},
metrics_safety_rules::MetricsSafetyRules,
monitor,
network::{IncomingBlockRetrievalRequest, NetworkReceivers, NetworkSender},
network::{
IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest, IncomingRpcRequest,
NetworkReceivers, NetworkSender,
},
network_interface::{ConsensusMsg, ConsensusNetworkClient},
payload_client::QuorumStoreClient,
persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
Expand Down Expand Up @@ -127,6 +130,8 @@ pub struct EpochManager {
quorum_store_msg_tx: Option<aptos_channel::Sender<AccountAddress, VerifiedEvent>>,
quorum_store_coordinator_tx: Option<Sender<CoordinatorCommand>>,
quorum_store_storage: Arc<dyn QuorumStoreStorage>,
batch_retrieval_tx:
Option<aptos_channel::Sender<AccountAddress, IncomingBatchRetrievalRequest>>,
}

impl EpochManager {
Expand Down Expand Up @@ -169,6 +174,7 @@ impl EpochManager {
quorum_store_msg_tx: None,
quorum_store_coordinator_tx: None,
quorum_store_storage,
batch_retrieval_tx: None,
}
}

Expand Down Expand Up @@ -539,6 +545,7 @@ impl EpochManager {

// Shutdown the block retrieval task by dropping the sender
self.block_retrieval_tx = None;
self.batch_retrieval_tx = None;

if let Some(mut quorum_store_coordinator_tx) = self.quorum_store_coordinator_tx.take() {
let (ack_tx, ack_rx) = oneshot::channel();
Expand Down Expand Up @@ -693,7 +700,12 @@ impl EpochManager {
payload_manager.clone(),
));

self.quorum_store_coordinator_tx = quorum_store_builder.start();
if let Some((quorum_store_coordinator_tx, batch_retrieval_rx)) =
quorum_store_builder.start()
{
self.quorum_store_coordinator_tx = Some(quorum_store_coordinator_tx);
self.batch_retrieval_tx = Some(batch_retrieval_rx);
}

info!(epoch = epoch, "Create ProposalGenerator");
// txn manager is required both by proposal generator (to pull the proposers)
Expand Down Expand Up @@ -912,8 +924,6 @@ impl EpochManager {
) -> anyhow::Result<()> {
match event {
UnverifiedEvent::FragmentMsg(_)
| UnverifiedEvent::BatchRequestMsg(_)
| UnverifiedEvent::BatchMsg(_)
| UnverifiedEvent::SignedDigestMsg(_)
| UnverifiedEvent::ProofOfStoreMsg(_) => {
if self.quorum_store_enabled {
Expand Down Expand Up @@ -941,9 +951,7 @@ impl EpochManager {
);
}
match event {
quorum_store_event @ (VerifiedEvent::BatchRequestMsg(_)
| VerifiedEvent::UnverifiedBatchMsg(_)
| VerifiedEvent::SignedDigestMsg(_)
quorum_store_event @ (VerifiedEvent::SignedDigestMsg(_)
| VerifiedEvent::ProofOfStoreMsg(_)
| VerifiedEvent::FragmentMsg(_)) => {
if let Some(sender) = &mut self.quorum_store_msg_tx {
Expand Down Expand Up @@ -975,18 +983,29 @@ impl EpochManager {
}
}

fn process_block_retrieval(
fn process_rpc_request(
&self,
peer_id: Author,
request: IncomingBlockRetrievalRequest,
request: IncomingRpcRequest,
) -> anyhow::Result<()> {
fail_point!("consensus::process::any", |_| {
Err(anyhow::anyhow!("Injected error in process_block_retrieval"))
Err(anyhow::anyhow!("Injected error in process_rpc_request"))
});
if let Some(tx) = &self.block_retrieval_tx {
tx.push(peer_id, request)
} else {
Err(anyhow::anyhow!("Round manager not started"))
match request {
IncomingRpcRequest::BlockRetrieval(request) => {
if let Some(tx) = &self.block_retrieval_tx {
tx.push(peer_id, request)
} else {
Err(anyhow::anyhow!("Round manager not started"))
}
},
IncomingRpcRequest::BatchRetrieval(request) => {
if let Some(tx) = &self.batch_retrieval_tx {
tx.push(peer_id, request)
} else {
Err(anyhow::anyhow!("Quorum store not started"))
}
},
}
}

Expand Down Expand Up @@ -1023,8 +1042,8 @@ impl EpochManager {
error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e));
}
},
(peer, request) = network_receivers.block_retrieval.select_next_some() => {
if let Err(e) = self.process_block_retrieval(peer, request) {
(peer, request) = network_receivers.rpc_rx.select_next_some() => {
if let Err(e) = self.process_rpc_request(peer, request) {
error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e));
}
},
Expand Down
1 change: 1 addition & 0 deletions consensus/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub enum LogEvent {
NewEpoch,
NewRound,
Propose,
ReceiveBatchRetrieval,
ReceiveBlockRetrieval,
ReceiveEpochChangeProof,
ReceiveEpochRetrieval,
Expand Down
97 changes: 74 additions & 23 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ pub struct IncomingBlockRetrievalRequest {
pub response_sender: oneshot::Sender<Result<Bytes, RpcError>>,
}

#[derive(Debug)]
pub struct IncomingBatchRetrievalRequest {
pub req: BatchRequest,
pub protocol: ProtocolId,
pub response_sender: oneshot::Sender<Result<Bytes, RpcError>>,
}

#[derive(Debug)]
pub enum IncomingRpcRequest {
BlockRetrieval(IncomingBlockRetrievalRequest),
BatchRetrieval(IncomingBatchRetrievalRequest),
}

/// Just a convenience struct to keep all the network proxy receiving queues in one place.
/// Will be returned by the NetworkTask upon startup.
pub struct NetworkReceivers {
Expand All @@ -65,14 +78,20 @@ pub struct NetworkReceivers {
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
pub block_retrieval:
aptos_channel::Receiver<AccountAddress, (AccountAddress, IncomingBlockRetrievalRequest)>,
pub rpc_rx: aptos_channel::Receiver<AccountAddress, (AccountAddress, IncomingRpcRequest)>,
}

#[async_trait::async_trait]
pub(crate) trait QuorumStoreSender {
pub trait QuorumStoreSender: Send + Clone {
async fn send_batch_request(&self, request: BatchRequest, recipients: Vec<Author>);

async fn request_batch(
&self,
request: BatchRequest,
recipient: Author,
timeout: Duration,
) -> anyhow::Result<Batch>;

async fn send_batch(&self, batch: Batch, recipients: Vec<Author>);

async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec<Author>);
Expand Down Expand Up @@ -311,6 +330,23 @@ impl QuorumStoreSender for NetworkSender {
self.send(msg, recipients).await
}

async fn request_batch(
&self,
request: BatchRequest,
recipient: Author,
timeout: Duration,
) -> anyhow::Result<Batch> {
let msg = ConsensusMsg::BatchRequestMsg(Box::new(request));
let response = self
.consensus_network_client
.send_rpc(recipient, msg, timeout)
.await?;
match response {
ConsensusMsg::BatchMsg(batch) => Ok(*batch),
_ => Err(anyhow!("Invalid batch response")),
}
}

async fn send_batch(&self, batch: Batch, recipients: Vec<Author>) {
fail_point!("consensus::send::batch", |_| ());
let msg = ConsensusMsg::BatchMsg(Box::new(batch));
Expand Down Expand Up @@ -345,8 +381,7 @@ pub struct NetworkTask {
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
block_retrieval_tx:
aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingBlockRetrievalRequest)>,
rpc_tx: aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingRpcRequest)>,
all_events: Box<dyn Stream<Item = Event<ConsensusMsg>> + Send + Unpin>,
}

Expand All @@ -364,11 +399,8 @@ impl NetworkTask {
50,
Some(&counters::QUORUM_STORE_CHANNEL_MSGS),
);
let (block_retrieval_tx, block_retrieval) = aptos_channel::new(
QueueStyle::LIFO,
1,
Some(&counters::BLOCK_RETRIEVAL_CHANNEL_MSGS),
);
let (rpc_tx, rpc_rx) =
aptos_channel::new(QueueStyle::LIFO, 1, Some(&counters::RPC_CHANNEL_MSGS));

// Verify the network events have been constructed correctly
let network_and_events = network_service_events.into_network_and_events();
Expand All @@ -387,13 +419,13 @@ impl NetworkTask {
NetworkTask {
consensus_messages_tx,
quorum_store_messages_tx,
block_retrieval_tx,
rpc_tx,
all_events,
},
NetworkReceivers {
consensus_messages,
quorum_store_messages,
block_retrieval,
rpc_rx,
},
)
}
Expand Down Expand Up @@ -422,10 +454,11 @@ impl NetworkTask {
.with_label_values(&[msg.name()])
.inc();
match msg {
ConsensusMsg::BatchRequestMsg(_) | ConsensusMsg::BatchMsg(_) => {
warn!("unexpected msg");
},
quorum_store_msg @ (ConsensusMsg::SignedDigestMsg(_)
| ConsensusMsg::FragmentMsg(_)
| ConsensusMsg::BatchRequestMsg(_)
| ConsensusMsg::BatchMsg(_)
| ConsensusMsg::ProofOfStoreMsg(_)) => {
Self::push_msg(
peer_id,
Expand Down Expand Up @@ -463,15 +496,33 @@ impl NetworkTask {
);
continue;
}
let req_with_callback = IncomingBlockRetrievalRequest {
req: *request,
protocol,
response_sender: callback,
};
if let Err(e) = self
.block_retrieval_tx
.push(peer_id, (peer_id, req_with_callback))
{
let req_with_callback =
IncomingRpcRequest::BlockRetrieval(IncomingBlockRetrievalRequest {
req: *request,
protocol,
response_sender: callback,
});
if let Err(e) = self.rpc_tx.push(peer_id, (peer_id, req_with_callback)) {
warn!(error = ?e, "aptos channel closed");
}
},
ConsensusMsg::BatchRequestMsg(request) => {
counters::CONSENSUS_RECEIVED_MSGS
.with_label_values(&["BatchRetrievalRequest"])
.inc();
debug!(
remote_peer = peer_id,
event = LogEvent::ReceiveBatchRetrieval,
"{:?}",
request
);
let req_with_callback =
IncomingRpcRequest::BatchRetrieval(IncomingBatchRetrievalRequest {
req: *request,
protocol,
response_sender: callback,
});
if let Err(e) = self.rpc_tx.push(peer_id, (peer_id, req_with_callback)) {
warn!(error = ?e, "aptos channel closed");
}
},
Expand Down
17 changes: 11 additions & 6 deletions consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ impl DropConfigRound {
mod tests {
use super::*;
use crate::{
network::NetworkTask,
network::{IncomingRpcRequest, NetworkTask},
network_interface::{DIRECT_SEND, RPC},
};
use aptos_config::network_id::{NetworkId, PeerNetworkId};
Expand Down Expand Up @@ -749,9 +749,9 @@ mod tests {
);

// verify request block rpc
let mut block_retrieval = receiver_1.block_retrieval;
let mut rpc_rx = receiver_1.rpc_rx;
let on_request_block = async move {
while let Some((_, request)) = block_retrieval.next().await {
while let Some((_, request)) = rpc_rx.next().await {
// make sure the network task is not blocked during RPC
// we limit the network notification queue size to 1 so if it's blocked,
// we can not process 2 votes and the test will timeout
Expand All @@ -764,7 +764,12 @@ mod tests {
BlockRetrievalResponse::new(BlockRetrievalStatus::IdNotFound, vec![]);
let response = ConsensusMsg::BlockRetrievalResponse(Box::new(response));
let bytes = Bytes::from(serde_json::to_vec(&response).unwrap());
request.response_sender.send(Ok(bytes)).unwrap();
match request {
IncomingRpcRequest::BlockRetrieval(request) => {
request.response_sender.send(Ok(bytes)).unwrap()
},
_ => panic!("unexpected message"),
}
}
};
runtime.handle().spawn(on_request_block);
Expand Down Expand Up @@ -824,13 +829,13 @@ mod tests {
.unwrap();

let f_check = async move {
assert!(network_receivers.block_retrieval.next().await.is_some());
assert!(network_receivers.rpc_rx.next().await.is_some());

drop(peer_mgr_notifs_tx);
drop(connection_notifs_tx);
drop(self_sender);

assert!(network_receivers.block_retrieval.next().await.is_none());
assert!(network_receivers.rpc_rx.next().await.is_none());
assert!(network_receivers.consensus_messages.next().await.is_none());
};
let f_network_task = network_task.start();
Expand Down
Loading

0 comments on commit f77d1d2

Please sign in to comment.