diff --git a/Cargo.lock b/Cargo.lock index a91b9549b8a74..6d4d3c24825c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -540,6 +540,7 @@ version = "0.1.0" dependencies = [ "anyhow", "aptos-bitvec", + "aptos-bounded-executor", "aptos-channels", "aptos-config", "aptos-consensus-notifications", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index e85b1efb9974f..ca83ad9c4519e 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -15,6 +15,7 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } aptos-bitvec = { workspace = true } +aptos-bounded-executor = { workspace = true } aptos-channels = { workspace = true } aptos-config = { workspace = true } aptos-consensus-notifications = { workspace = true } diff --git a/consensus/src/block_storage/block_store_test.rs b/consensus/src/block_storage/block_store_test.rs index 9fb0e1d0a81ee..197301763bbed 100644 --- a/consensus/src/block_storage/block_store_test.rs +++ b/consensus/src/block_storage/block_store_test.rs @@ -473,7 +473,7 @@ async fn test_need_sync_for_ledger_info() { assert!(block_store.need_sync_for_ledger_info(&ordered_too_far)); let committed_round_too_far = - block_store.commit_root().round() + block_store.back_pressure_limit * 2 + 1; + block_store.commit_root().round() + block_store.back_pressure_limit * 3 + 1; let committed_too_far = create_ledger_info(committed_round_too_far); assert!(block_store.need_sync_for_ledger_info(&committed_too_far)); diff --git a/consensus/src/block_storage/sync_manager.rs b/consensus/src/block_storage/sync_manager.rs index 646b0d0d9d123..6ea9b3f1c5623 100644 --- a/consensus/src/block_storage/sync_manager.rs +++ b/consensus/src/block_storage/sync_manager.rs @@ -47,7 +47,7 @@ impl BlockStore { pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { (self.ordered_root().round() < li.commit_info().round() && !self.block_exists(li.commit_info().id())) - || self.commit_root().round() + 2 * self.back_pressure_limit < li.commit_info().round() + || self.commit_root().round() + 3 * self.back_pressure_limit < li.commit_info().round() } /// Checks if quorum certificate can be inserted in block store without RPC diff --git a/consensus/src/consensus_provider.rs b/consensus/src/consensus_provider.rs index 86cc50b730945..d48517cb357b3 100644 --- a/consensus/src/consensus_provider.rs +++ b/consensus/src/consensus_provider.rs @@ -12,6 +12,7 @@ use crate::{ txn_notifier::MempoolNotifier, util::time_service::ClockTimeService, }; +use aptos_bounded_executor::BoundedExecutor; use aptos_config::config::NodeConfig; use aptos_consensus_notifications::ConsensusNotificationSender; use aptos_event_notifications::ReconfigNotificationListener; @@ -57,6 +58,7 @@ pub fn start_consensus( let (self_sender, self_receiver) = aptos_channels::new(1_024, &counters::PENDING_SELF_MESSAGES); let consensus_network_client = ConsensusNetworkClient::new(network_client); + let bounded_executor = BoundedExecutor::new(4, runtime.handle().clone()); let epoch_mgr = EpochManager::new( node_config, time_service, @@ -67,6 +69,7 @@ pub fn start_consensus( state_computer, storage, reconfig_events, + bounded_executor, ); let (network_task, network_receiver) = NetworkTask::new(network_service_events, self_receiver); diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index a67942642b37d..392bdbd319bfd 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -43,6 +43,7 @@ use crate::{ util::time_service::TimeService, }; use anyhow::{bail, ensure, Context}; +use aptos_bounded_executor::BoundedExecutor; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::config::{ConsensusConfig, NodeConfig}; use aptos_consensus_types::{ @@ -78,6 +79,7 @@ use itertools::Itertools; use std::{ cmp::Ordering, collections::HashMap, + hash::Hash, mem::{discriminant, Discriminant}, path::PathBuf, sync::Arc, @@ -120,12 +122,13 @@ pub struct EpochManager { aptos_channel::Sender<(Author, Discriminant), (Author, VerifiedEvent)>, >, round_manager_close_tx: Option>>, - epoch_state: Option, + epoch_state: Option>, block_retrieval_tx: Option>, quorum_store_storage_path: PathBuf, quorum_store_msg_tx: Option>, quorum_store_coordinator_tx: Option>, + bounded_executor: BoundedExecutor, } impl EpochManager { @@ -139,6 +142,7 @@ impl EpochManager { commit_state_computer: Arc, storage: Arc, reconfig_events: ReconfigNotificationListener, + bounded_executor: BoundedExecutor, ) -> Self { let author = node_config.validator_network.as_ref().unwrap().peer_id(); let config = node_config.consensus.clone(); @@ -167,6 +171,7 @@ impl EpochManager { quorum_store_storage_path: node_config.storage.dir(), quorum_store_msg_tx: None, quorum_store_coordinator_tx: None, + bounded_executor, } } @@ -440,7 +445,7 @@ impl EpochManager { fn spawn_block_retrieval_task(&mut self, epoch: u64, block_store: Arc) { let (request_tx, mut request_rx) = aptos_channel::new( - QueueStyle::LIFO, + QueueStyle::FIFO, 1, Some(&counters::BLOCK_RETRIEVAL_TASK_MSGS), ); @@ -759,7 +764,7 @@ impl EpochManager { error!("Failed to read on-chain consensus config {}", error); } - self.epoch_state = Some(epoch_state.clone()); + self.epoch_state = Some(Arc::new(epoch_state.clone())); match self.storage.start() { LivenessStorageData::FullRecoveryData(initial_data) => { @@ -800,27 +805,43 @@ impl EpochManager { self.filter_quorum_store_events(peer_id, &unverified_event)?; // same epoch -> run well-formedness + signature check - let verified_event = monitor!( - "verify_message", - unverified_event.clone().verify( - peer_id, - &self.epoch_state().verifier, - self.quorum_store_enabled - ) - ) - .context("[EpochManager] Verify event") - .map_err(|err| { - error!( - SecurityEvent::ConsensusInvalidMessage, - remote_peer = peer_id, - error = ?err, - unverified_event = unverified_event - ); - err - })?; - - // process the verified event - self.process_event(peer_id, verified_event)?; + let epoch_state = self.epoch_state.clone().unwrap(); + let quorum_store_enabled = self.quorum_store_enabled; + let quorum_store_msg_tx = self.quorum_store_msg_tx.clone(); + let buffer_manager_msg_tx = self.buffer_manager_msg_tx.clone(); + let round_manager_tx = self.round_manager_tx.clone(); + let my_peer_id = self.author; + self.bounded_executor + .spawn(async move { + match monitor!( + "verify_message", + unverified_event.clone().verify( + peer_id, + &epoch_state.verifier, + quorum_store_enabled, + peer_id == my_peer_id, + ) + ) { + Ok(verified_event) => { + Self::forward_event( + quorum_store_msg_tx, + buffer_manager_msg_tx, + round_manager_tx, + peer_id, + verified_event, + ); + }, + Err(e) => { + error!( + SecurityEvent::ConsensusInvalidMessage, + remote_peer = peer_id, + error = ?e, + unverified_event = unverified_event + ); + }, + } + }) + .await; } Ok(()) } @@ -910,49 +931,55 @@ impl EpochManager { } } - fn process_event( - &mut self, + fn forward_event_to( + mut maybe_tx: Option>, + key: K, + value: V, + ) -> anyhow::Result<()> { + if let Some(tx) = &mut maybe_tx { + tx.push(key, value) + } else { + bail!("channel not initialized"); + } + } + + fn forward_event( + quorum_store_msg_tx: Option>, + buffer_manager_msg_tx: Option>, + round_manager_tx: Option< + aptos_channel::Sender<(Author, Discriminant), (Author, VerifiedEvent)>, + >, peer_id: AccountAddress, event: VerifiedEvent, - ) -> anyhow::Result<()> { + ) { if let VerifiedEvent::ProposalMsg(proposal) = &event { observe_block( proposal.proposal().timestamp_usecs(), BlockStage::EPOCH_MANAGER_VERIFIED, ); } - match event { + if let Err(e) = match event { quorum_store_event @ (VerifiedEvent::BatchRequestMsg(_) | VerifiedEvent::UnverifiedBatchMsg(_) | VerifiedEvent::SignedDigestMsg(_) | VerifiedEvent::ProofOfStoreMsg(_) | VerifiedEvent::FragmentMsg(_)) => { - if let Some(sender) = &mut self.quorum_store_msg_tx { - sender.push(peer_id, quorum_store_event)?; - } + Self::forward_event_to(quorum_store_msg_tx, peer_id, quorum_store_event) + .context("quorum store sender") }, buffer_manager_event @ (VerifiedEvent::CommitVote(_) | VerifiedEvent::CommitDecision(_)) => { - if let Some(sender) = &mut self.buffer_manager_msg_tx { - sender.push(peer_id, buffer_manager_event)?; - } else { - bail!("Commit Phase not started but received Commit Message (CommitVote/CommitDecision)"); - } + Self::forward_event_to(buffer_manager_msg_tx, peer_id, buffer_manager_event) + .context("buffer manager sender") }, - round_manager_event => { - self.forward_to_round_manager(peer_id, round_manager_event); - }, - } - Ok(()) - } - - fn forward_to_round_manager(&mut self, peer_id: Author, event: VerifiedEvent) { - let sender = self - .round_manager_tx - .as_mut() - .expect("RoundManager not started"); - if let Err(e) = sender.push((peer_id, discriminant(&event)), (peer_id, event)) { - error!("Failed to send event to round manager {:?}", e); + round_manager_event => Self::forward_event_to( + round_manager_tx, + (peer_id, discriminant(&round_manager_event)), + (peer_id, round_manager_event), + ) + .context("round manager sender"), + } { + warn!("Failed to forward event: {}", e); } } @@ -972,7 +999,15 @@ impl EpochManager { } fn process_local_timeout(&mut self, round: u64) { - self.forward_to_round_manager(self.author, VerifiedEvent::LocalTimeout(round)); + let peer_id = self.author; + let event = VerifiedEvent::LocalTimeout(round); + let sender = self + .round_manager_tx + .as_mut() + .expect("RoundManager not started"); + if let Err(e) = sender.push((peer_id, discriminant(&event)), (peer_id, event)) { + error!("Failed to send event to round manager {:?}", e); + } } async fn await_reconfig_notification(&mut self) { @@ -993,7 +1028,7 @@ impl EpochManager { // initial start of the processor self.await_reconfig_notification().await; loop { - ::futures::select! { + tokio::select! { (peer, msg) = network_receivers.consensus_messages.select_next_some() => { if let Err(e) = self.process_message(peer, msg).await { error!(epoch = self.epoch(), error = ?e, kind = error_kind(&e)); diff --git a/consensus/src/experimental/tests/buffer_manager_tests.rs b/consensus/src/experimental/tests/buffer_manager_tests.rs index 17dfad1cadfe1..94236f7797647 100644 --- a/consensus/src/experimental/tests/buffer_manager_tests.rs +++ b/consensus/src/experimental/tests/buffer_manager_tests.rs @@ -224,7 +224,10 @@ async fn loopback_commit_vote( let event: UnverifiedEvent = msg.into(); // verify the message and send the message into self loop msg_tx - .push(author, event.verify(author, verifier, false).unwrap()) + .push( + author, + event.verify(author, verifier, false, false).unwrap(), + ) .ok(); } }, diff --git a/consensus/src/network.rs b/consensus/src/network.rs index f26d95465da6c..95bbbd1a84580 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -56,7 +56,7 @@ pub struct IncomingBlockRetrievalRequest { /// Just a convenience struct to keep all the network proxy receiving queues in one place. /// Will be returned by the NetworkTask upon startup. pub struct NetworkReceivers { - /// Provide a LIFO buffer for each (Author, MessageType) key + /// Provide a FIFO buffer for each (Author, MessageType) key pub consensus_messages: aptos_channel::Receiver< (AccountAddress, Discriminant), (AccountAddress, ConsensusMsg), @@ -356,8 +356,11 @@ impl NetworkTask { network_service_events: NetworkServiceEvents, self_receiver: aptos_channels::Receiver>, ) -> (NetworkTask, NetworkReceivers) { - let (consensus_messages_tx, consensus_messages) = - aptos_channel::new(QueueStyle::LIFO, 1, Some(&counters::CONSENSUS_CHANNEL_MSGS)); + let (consensus_messages_tx, consensus_messages) = aptos_channel::new( + QueueStyle::FIFO, + 20, + Some(&counters::CONSENSUS_CHANNEL_MSGS), + ); let (quorum_store_messages_tx, quorum_store_messages) = aptos_channel::new( QueueStyle::FIFO, // TODO: tune this value based on quorum store messages with backpressure @@ -365,8 +368,8 @@ impl NetworkTask { Some(&counters::QUORUM_STORE_CHANNEL_MSGS), ); let (block_retrieval_tx, block_retrieval) = aptos_channel::new( - QueueStyle::LIFO, - 1, + QueueStyle::FIFO, + 10, Some(&counters::BLOCK_RETRIEVAL_CHANNEL_MSGS), ); diff --git a/consensus/src/network_tests.rs b/consensus/src/network_tests.rs index adc5215d54df2..79bd17a26cd22 100644 --- a/consensus/src/network_tests.rs +++ b/consensus/src/network_tests.rs @@ -104,6 +104,10 @@ impl NetworkPlayground { } } + pub fn handle(&self) -> Handle { + self.executor.clone() + } + /// HashMap of supported protocols to initialize ConsensusNetworkClient. pub fn peer_protocols(&self) -> Arc { self.peers_and_metadata.clone() diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index 9b2d627c8de6c..4718ed401afaf 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -83,29 +83,40 @@ impl UnverifiedEvent { peer_id: PeerId, validator: &ValidatorVerifier, quorum_store_enabled: bool, + self_message: bool, ) -> Result { Ok(match self { //TODO: no need to sign and verify the proposal UnverifiedEvent::ProposalMsg(p) => { - p.verify(validator, quorum_store_enabled)?; + if !self_message { + p.verify(validator, quorum_store_enabled)?; + } VerifiedEvent::ProposalMsg(p) }, UnverifiedEvent::VoteMsg(v) => { - v.verify(validator)?; + if !self_message { + v.verify(validator)?; + } VerifiedEvent::VoteMsg(v) }, // sync info verification is on-demand (verified when it's used) UnverifiedEvent::SyncInfo(s) => VerifiedEvent::UnverifiedSyncInfo(s), UnverifiedEvent::CommitVote(cv) => { - cv.verify(validator)?; + if !self_message { + cv.verify(validator)?; + } VerifiedEvent::CommitVote(cv) }, UnverifiedEvent::CommitDecision(cd) => { - cd.verify(validator)?; + if !self_message { + cd.verify(validator)?; + } VerifiedEvent::CommitDecision(cd) }, UnverifiedEvent::FragmentMsg(f) => { - f.verify(peer_id)?; + if !self_message { + f.verify(peer_id)?; + } VerifiedEvent::FragmentMsg(f) }, UnverifiedEvent::BatchRequestMsg(br) => { @@ -118,11 +129,15 @@ impl UnverifiedEvent { VerifiedEvent::UnverifiedBatchMsg(b) }, UnverifiedEvent::SignedDigestMsg(sd) => { - sd.verify(validator)?; + if !self_message { + sd.verify(validator)?; + } VerifiedEvent::SignedDigestMsg(sd) }, UnverifiedEvent::ProofOfStoreMsg(p) => { - p.verify(validator)?; + if !self_message { + p.verify(validator)?; + } VerifiedEvent::ProofOfStoreMsg(p) }, }) diff --git a/consensus/src/twins/twins_node.rs b/consensus/src/twins/twins_node.rs index d458d6383b088..95096c9f42534 100644 --- a/consensus/src/twins/twins_node.rs +++ b/consensus/src/twins/twins_node.rs @@ -13,6 +13,7 @@ use crate::{ test_utils::{MockStateComputer, MockStorage}, util::time_service::ClockTimeService, }; +use aptos_bounded_executor::BoundedExecutor; use aptos_channels::{self, aptos_channel, message_queues::QueueStyle}; use aptos_config::{ config::{NodeConfig, WaypointConfig}, @@ -136,6 +137,8 @@ impl SMRNode { let (self_sender, self_receiver) = aptos_channels::new(1_024, &counters::PENDING_SELF_MESSAGES); + let bounded_executor = BoundedExecutor::new(2, playground.handle()); + let epoch_mgr = EpochManager::new( &config, time_service, @@ -146,6 +149,7 @@ impl SMRNode { state_computer.clone(), storage.clone(), reconfig_listener, + bounded_executor, ); let (network_task, network_receiver) = NetworkTask::new(network_service_events, self_receiver); diff --git a/crates/channel/src/aptos_channel.rs b/crates/channel/src/aptos_channel.rs index a0ae94dfa30cd..cc1d15a8b3732 100644 --- a/crates/channel/src/aptos_channel.rs +++ b/crates/channel/src/aptos_channel.rs @@ -11,7 +11,6 @@ use crate::message_queues::{PerKeyQueue, QueueStyle}; use anyhow::{ensure, Result}; use aptos_infallible::{Mutex, NonZeroUsize}; -use aptos_logger::debug; use aptos_metrics_core::IntCounterVec; use futures::{ channel::oneshot, @@ -105,7 +104,6 @@ impl Sender { // notify the corresponding status channel if it was registered. if let Some((dropped_val, Some(dropped_status_ch))) = dropped { // Ignore errors. - debug!("QS: dropped message in aptos channel"); let _err = dropped_status_ch.send(ElementStatus::Dropped(dropped_val)); } if let Some(w) = shared_state.waker.take() {