Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[quorum store] refactor batch_reader/store and batch requester #6785

Merged
merged 1 commit into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/lint-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ jobs:
with:
tool: nextest
- run: cargo nextest run --locked --workspace --exclude smoke-test --exclude aptos-testcases --exclude aptos-api --exclude aptos-executor-benchmark --exclude aptos-backup-cli --retries 3 --no-fail-fast -F consensus-only-perf-test
env:
RUST_MIN_STACK: 4297152

rust-smoke-test:
runs-on: high-perf-docker
Expand Down
3 changes: 1 addition & 2 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use aptos_types::{
};
use rand::{seq::SliceRandom, thread_rng};
use serde::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub struct LogicalTime {
Expand Down Expand Up @@ -78,7 +77,7 @@ impl SignedDigest {
expiration: LogicalTime,
num_txns: u64,
num_bytes: u64,
validator_signer: Arc<ValidatorSigner>,
validator_signer: &ValidatorSigner,
) -> Result<Self, CryptoMaterialError> {
let info = SignedDigestInfo::new(batch_author, digest, expiration, num_txns, num_bytes);
let signature = validator_signer.sign(&info)?;
Expand Down
10 changes: 5 additions & 5 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,17 +634,17 @@ pub static QUORUM_STORE_CHANNEL_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Counters(queued,dequeued,dropped) related to block retrieval channel
pub static BLOCK_RETRIEVAL_CHANNEL_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
/// Counters(queued,dequeued,dropped) related to rpc request channel
pub static RPC_CHANNEL_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_consensus_block_retrieval_channel_msgs_count",
"Counters(queued,dequeued,dropped) related to block retrieval channel",
"aptos_consensus_rpc_channel_msgs_count",
"Counters(queued,dequeued,dropped) related to rpc request channel",
&["state"]
)
.unwrap()
});

/// Counters(queued,dequeued,dropped) related to block retrieval task
/// Counters(queued,dequeued,dropped) related to block retrieval per epoch task
pub static BLOCK_RETRIEVAL_TASK_MSGS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_consensus_block_retrieval_task_msgs_count",
Expand Down
51 changes: 35 additions & 16 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use crate::{
logging::{LogEvent, LogSchema},
metrics_safety_rules::MetricsSafetyRules,
monitor,
network::{IncomingBlockRetrievalRequest, NetworkReceivers, NetworkSender},
network::{
IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest, IncomingRpcRequest,
NetworkReceivers, NetworkSender,
},
network_interface::{ConsensusMsg, ConsensusNetworkClient},
payload_client::QuorumStoreClient,
persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
Expand Down Expand Up @@ -127,6 +130,8 @@ pub struct EpochManager {
quorum_store_msg_tx: Option<aptos_channel::Sender<AccountAddress, VerifiedEvent>>,
quorum_store_coordinator_tx: Option<Sender<CoordinatorCommand>>,
quorum_store_storage: Arc<dyn QuorumStoreStorage>,
batch_retrieval_tx:
Option<aptos_channel::Sender<AccountAddress, IncomingBatchRetrievalRequest>>,
}

impl EpochManager {
Expand Down Expand Up @@ -169,6 +174,7 @@ impl EpochManager {
quorum_store_msg_tx: None,
quorum_store_coordinator_tx: None,
quorum_store_storage,
batch_retrieval_tx: None,
}
}

Expand Down Expand Up @@ -539,6 +545,7 @@ impl EpochManager {

// Shutdown the block retrieval task by dropping the sender
self.block_retrieval_tx = None;
self.batch_retrieval_tx = None;

if let Some(mut quorum_store_coordinator_tx) = self.quorum_store_coordinator_tx.take() {
let (ack_tx, ack_rx) = oneshot::channel();
Expand Down Expand Up @@ -693,7 +700,12 @@ impl EpochManager {
payload_manager.clone(),
));

self.quorum_store_coordinator_tx = quorum_store_builder.start();
if let Some((quorum_store_coordinator_tx, batch_retrieval_rx)) =
quorum_store_builder.start()
{
self.quorum_store_coordinator_tx = Some(quorum_store_coordinator_tx);
self.batch_retrieval_tx = Some(batch_retrieval_rx);
}

info!(epoch = epoch, "Create ProposalGenerator");
// txn manager is required both by proposal generator (to pull the proposers)
Expand Down Expand Up @@ -912,8 +924,6 @@ impl EpochManager {
) -> anyhow::Result<()> {
match event {
UnverifiedEvent::FragmentMsg(_)
| UnverifiedEvent::BatchRequestMsg(_)
| UnverifiedEvent::BatchMsg(_)
| UnverifiedEvent::SignedDigestMsg(_)
| UnverifiedEvent::ProofOfStoreMsg(_) => {
if self.quorum_store_enabled {
Expand Down Expand Up @@ -941,9 +951,7 @@ impl EpochManager {
);
}
match event {
quorum_store_event @ (VerifiedEvent::BatchRequestMsg(_)
| VerifiedEvent::UnverifiedBatchMsg(_)
| VerifiedEvent::SignedDigestMsg(_)
quorum_store_event @ (VerifiedEvent::SignedDigestMsg(_)
| VerifiedEvent::ProofOfStoreMsg(_)
| VerifiedEvent::FragmentMsg(_)) => {
if let Some(sender) = &mut self.quorum_store_msg_tx {
Expand Down Expand Up @@ -975,18 +983,29 @@ impl EpochManager {
}
}

fn process_block_retrieval(
fn process_rpc_request(
&self,
peer_id: Author,
request: IncomingBlockRetrievalRequest,
request: IncomingRpcRequest,
) -> anyhow::Result<()> {
fail_point!("consensus::process::any", |_| {
Err(anyhow::anyhow!("Injected error in process_block_retrieval"))
Err(anyhow::anyhow!("Injected error in process_rpc_request"))
});
if let Some(tx) = &self.block_retrieval_tx {
tx.push(peer_id, request)
} else {
Err(anyhow::anyhow!("Round manager not started"))
match request {
IncomingRpcRequest::BlockRetrieval(request) => {
if let Some(tx) = &self.block_retrieval_tx {
tx.push(peer_id, request)
} else {
Err(anyhow::anyhow!("Round manager not started"))
}
},
IncomingRpcRequest::BatchRetrieval(request) => {
if let Some(tx) = &self.batch_retrieval_tx {
tx.push(peer_id, request)
} else {
Err(anyhow::anyhow!("Quorum store not started"))
}
},
}
}

Expand Down Expand Up @@ -1023,8 +1042,8 @@ impl EpochManager {
error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e));
}
},
(peer, request) = network_receivers.block_retrieval.select_next_some() => {
if let Err(e) = self.process_block_retrieval(peer, request) {
(peer, request) = network_receivers.rpc_rx.select_next_some() => {
if let Err(e) = self.process_rpc_request(peer, request) {
error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e));
}
},
Expand Down
1 change: 1 addition & 0 deletions consensus/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub enum LogEvent {
NewEpoch,
NewRound,
Propose,
ReceiveBatchRetrieval,
ReceiveBlockRetrieval,
ReceiveEpochChangeProof,
ReceiveEpochRetrieval,
Expand Down
97 changes: 74 additions & 23 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ pub struct IncomingBlockRetrievalRequest {
pub response_sender: oneshot::Sender<Result<Bytes, RpcError>>,
}

#[derive(Debug)]
pub struct IncomingBatchRetrievalRequest {
pub req: BatchRequest,
pub protocol: ProtocolId,
pub response_sender: oneshot::Sender<Result<Bytes, RpcError>>,
}

#[derive(Debug)]
pub enum IncomingRpcRequest {
BlockRetrieval(IncomingBlockRetrievalRequest),
BatchRetrieval(IncomingBatchRetrievalRequest),
}

/// Just a convenience struct to keep all the network proxy receiving queues in one place.
/// Will be returned by the NetworkTask upon startup.
pub struct NetworkReceivers {
Expand All @@ -65,14 +78,20 @@ pub struct NetworkReceivers {
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
pub block_retrieval:
aptos_channel::Receiver<AccountAddress, (AccountAddress, IncomingBlockRetrievalRequest)>,
pub rpc_rx: aptos_channel::Receiver<AccountAddress, (AccountAddress, IncomingRpcRequest)>,
}

#[async_trait::async_trait]
pub(crate) trait QuorumStoreSender {
pub trait QuorumStoreSender: Send + Clone {
async fn send_batch_request(&self, request: BatchRequest, recipients: Vec<Author>);

async fn request_batch(
&self,
request: BatchRequest,
recipient: Author,
timeout: Duration,
) -> anyhow::Result<Batch>;

async fn send_batch(&self, batch: Batch, recipients: Vec<Author>);

async fn send_signed_digest(&self, signed_digest: SignedDigest, recipients: Vec<Author>);
Expand Down Expand Up @@ -311,6 +330,23 @@ impl QuorumStoreSender for NetworkSender {
self.send(msg, recipients).await
}

async fn request_batch(
&self,
request: BatchRequest,
recipient: Author,
timeout: Duration,
) -> anyhow::Result<Batch> {
let msg = ConsensusMsg::BatchRequestMsg(Box::new(request));
let response = self
.consensus_network_client
.send_rpc(recipient, msg, timeout)
.await?;
match response {
ConsensusMsg::BatchMsg(batch) => Ok(*batch),
_ => Err(anyhow!("Invalid batch response")),
}
}

async fn send_batch(&self, batch: Batch, recipients: Vec<Author>) {
fail_point!("consensus::send::batch", |_| ());
let msg = ConsensusMsg::BatchMsg(Box::new(batch));
Expand Down Expand Up @@ -345,8 +381,7 @@ pub struct NetworkTask {
(AccountAddress, Discriminant<ConsensusMsg>),
(AccountAddress, ConsensusMsg),
>,
block_retrieval_tx:
aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingBlockRetrievalRequest)>,
rpc_tx: aptos_channel::Sender<AccountAddress, (AccountAddress, IncomingRpcRequest)>,
all_events: Box<dyn Stream<Item = Event<ConsensusMsg>> + Send + Unpin>,
}

Expand All @@ -364,11 +399,8 @@ impl NetworkTask {
50,
Some(&counters::QUORUM_STORE_CHANNEL_MSGS),
);
let (block_retrieval_tx, block_retrieval) = aptos_channel::new(
QueueStyle::LIFO,
1,
Some(&counters::BLOCK_RETRIEVAL_CHANNEL_MSGS),
);
let (rpc_tx, rpc_rx) =
aptos_channel::new(QueueStyle::LIFO, 1, Some(&counters::RPC_CHANNEL_MSGS));

// Verify the network events have been constructed correctly
let network_and_events = network_service_events.into_network_and_events();
Expand All @@ -387,13 +419,13 @@ impl NetworkTask {
NetworkTask {
consensus_messages_tx,
quorum_store_messages_tx,
block_retrieval_tx,
rpc_tx,
all_events,
},
NetworkReceivers {
consensus_messages,
quorum_store_messages,
block_retrieval,
rpc_rx,
},
)
}
Expand Down Expand Up @@ -422,10 +454,11 @@ impl NetworkTask {
.with_label_values(&[msg.name()])
.inc();
match msg {
ConsensusMsg::BatchRequestMsg(_) | ConsensusMsg::BatchMsg(_) => {
warn!("unexpected msg");
},
quorum_store_msg @ (ConsensusMsg::SignedDigestMsg(_)
| ConsensusMsg::FragmentMsg(_)
| ConsensusMsg::BatchRequestMsg(_)
| ConsensusMsg::BatchMsg(_)
| ConsensusMsg::ProofOfStoreMsg(_)) => {
Self::push_msg(
peer_id,
Expand Down Expand Up @@ -463,15 +496,33 @@ impl NetworkTask {
);
continue;
}
let req_with_callback = IncomingBlockRetrievalRequest {
req: *request,
protocol,
response_sender: callback,
};
if let Err(e) = self
.block_retrieval_tx
.push(peer_id, (peer_id, req_with_callback))
{
let req_with_callback =
IncomingRpcRequest::BlockRetrieval(IncomingBlockRetrievalRequest {
req: *request,
protocol,
response_sender: callback,
});
if let Err(e) = self.rpc_tx.push(peer_id, (peer_id, req_with_callback)) {
warn!(error = ?e, "aptos channel closed");
}
},
ConsensusMsg::BatchRequestMsg(request) => {
counters::CONSENSUS_RECEIVED_MSGS
.with_label_values(&["BatchRetrievalRequest"])
.inc();
debug!(
remote_peer = peer_id,
event = LogEvent::ReceiveBatchRetrieval,
"{:?}",
request
);
let req_with_callback =
IncomingRpcRequest::BatchRetrieval(IncomingBatchRetrievalRequest {
req: *request,
protocol,
response_sender: callback,
});
if let Err(e) = self.rpc_tx.push(peer_id, (peer_id, req_with_callback)) {
warn!(error = ?e, "aptos channel closed");
}
},
Expand Down
17 changes: 11 additions & 6 deletions consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ impl DropConfigRound {
mod tests {
use super::*;
use crate::{
network::NetworkTask,
network::{IncomingRpcRequest, NetworkTask},
network_interface::{DIRECT_SEND, RPC},
};
use aptos_config::network_id::{NetworkId, PeerNetworkId};
Expand Down Expand Up @@ -749,9 +749,9 @@ mod tests {
);

// verify request block rpc
let mut block_retrieval = receiver_1.block_retrieval;
let mut rpc_rx = receiver_1.rpc_rx;
let on_request_block = async move {
while let Some((_, request)) = block_retrieval.next().await {
while let Some((_, request)) = rpc_rx.next().await {
// make sure the network task is not blocked during RPC
// we limit the network notification queue size to 1 so if it's blocked,
// we can not process 2 votes and the test will timeout
Expand All @@ -764,7 +764,12 @@ mod tests {
BlockRetrievalResponse::new(BlockRetrievalStatus::IdNotFound, vec![]);
let response = ConsensusMsg::BlockRetrievalResponse(Box::new(response));
let bytes = Bytes::from(serde_json::to_vec(&response).unwrap());
request.response_sender.send(Ok(bytes)).unwrap();
match request {
IncomingRpcRequest::BlockRetrieval(request) => {
request.response_sender.send(Ok(bytes)).unwrap()
},
_ => panic!("unexpected message"),
}
}
};
runtime.handle().spawn(on_request_block);
Expand Down Expand Up @@ -824,13 +829,13 @@ mod tests {
.unwrap();

let f_check = async move {
assert!(network_receivers.block_retrieval.next().await.is_some());
assert!(network_receivers.rpc_rx.next().await.is_some());

drop(peer_mgr_notifs_tx);
drop(connection_notifs_tx);
drop(self_sender);

assert!(network_receivers.block_retrieval.next().await.is_none());
assert!(network_receivers.rpc_rx.next().await.is_none());
assert!(network_receivers.consensus_messages.next().await.is_none());
};
let f_network_task = network_task.start();
Expand Down
Loading