diff --git a/Cargo.lock b/Cargo.lock index 82f07bf234461..d15a219ec9956 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -518,6 +518,7 @@ version = "0.1.0" dependencies = [ "anyhow", "aptos-bitvec", + "aptos-bounded-executor", "aptos-channels", "aptos-config", "aptos-consensus-notifications", diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 90c4a322295d6..17ff58f299171 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -15,6 +15,7 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } aptos-bitvec = { workspace = true } +aptos-bounded-executor = { workspace = true } aptos-channels = { workspace = true } aptos-config = { workspace = true } aptos-consensus-notifications = { workspace = true } diff --git a/consensus/src/block_storage/sync_manager.rs b/consensus/src/block_storage/sync_manager.rs index c4c0638ef3084..fe44270dbab27 100644 --- a/consensus/src/block_storage/sync_manager.rs +++ b/consensus/src/block_storage/sync_manager.rs @@ -46,7 +46,7 @@ impl BlockStore { pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { (self.ordered_root().round() < li.commit_info().round() && !self.block_exists(li.commit_info().id())) - || self.commit_root().round() + 2 * self.back_pressure_limit < li.commit_info().round() + || self.commit_root().round() + 3 * self.back_pressure_limit < li.commit_info().round() } /// Checks if quorum certificate can be inserted in block store without RPC diff --git a/consensus/src/consensus_provider.rs b/consensus/src/consensus_provider.rs index 630ae69b8016c..f007e637d7a5f 100644 --- a/consensus/src/consensus_provider.rs +++ b/consensus/src/consensus_provider.rs @@ -11,6 +11,7 @@ use crate::{ txn_notifier::MempoolNotifier, util::time_service::ClockTimeService, }; +use aptos_bounded_executor::BoundedExecutor; use aptos_config::config::NodeConfig; use aptos_consensus_notifications::ConsensusNotificationSender; use aptos_event_notifications::ReconfigNotificationListener; @@ -60,6 +61,7 @@ pub fn start_consensus( state_sync_notifier, runtime.handle(), )); + let bounded_executor = BoundedExecutor::new(4, runtime.handle().clone()); let time_service = Arc::new(ClockTimeService::new(runtime.handle().clone())); @@ -78,6 +80,7 @@ pub fn start_consensus( state_computer, storage, reconfig_events, + bounded_executor, ); let (network_task, network_receiver) = NetworkTask::new(network_events, self_receiver); diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index 4e1fa0ec475fc..9f930bf641bc8 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -40,6 +40,7 @@ use crate::{ util::time_service::TimeService, }; use anyhow::{bail, ensure, Context}; +use aptos_bounded_executor::BoundedExecutor; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::config::{ConsensusConfig, NodeConfig}; use aptos_consensus_types::{ @@ -73,6 +74,7 @@ use futures::{ SinkExt, StreamExt, }; use itertools::Itertools; +use std::hash::Hash; use std::{ cmp::Ordering, collections::HashMap, @@ -117,9 +119,10 @@ pub struct EpochManager { aptos_channel::Sender<(Author, Discriminant), (Author, VerifiedEvent)>, >, round_manager_close_tx: Option>>, - epoch_state: Option, + epoch_state: Option>, block_retrieval_tx: Option>, + bounded_executor: BoundedExecutor, } impl EpochManager { @@ -133,6 +136,7 @@ impl EpochManager { commit_state_computer: Arc, storage: Arc, reconfig_events: ReconfigNotificationListener, + bounded_executor: BoundedExecutor, ) -> Self { let author = node_config.validator_network.as_ref().unwrap().peer_id(); let config = node_config.consensus.clone(); @@ -157,6 +161,7 @@ impl EpochManager { round_manager_close_tx: None, epoch_state: None, block_retrieval_tx: None, + bounded_executor, } } @@ -442,7 +447,7 @@ impl EpochManager { fn spawn_block_retrieval_task(&mut self, epoch: u64, block_store: Arc) { let (request_tx, mut request_rx) = aptos_channel::new( - QueueStyle::LIFO, + QueueStyle::FIFO, 1, Some(&counters::BLOCK_RETRIEVAL_TASK_MSGS), ); @@ -719,7 +724,7 @@ impl EpochManager { error!("Failed to read on-chain consensus config {}", error); } - self.epoch_state = Some(epoch_state.clone()); + self.epoch_state = Some(Arc::new(epoch_state.clone())); match self.storage.start() { LivenessStorageData::FullRecoveryData(initial_data) => { @@ -757,29 +762,87 @@ impl EpochManager { if let Some(unverified_event) = maybe_unverified_event { // same epoch -> run well-formedness + signature check - let verified_event = monitor!( - "verify_message", - unverified_event - .clone() - .verify(&self.epoch_state().verifier, self.quorum_store_enabled) - ) - .context("[EpochManager] Verify event") - .map_err(|err| { - error!( - SecurityEvent::ConsensusInvalidMessage, - remote_peer = peer_id, - error = ?err, - unverified_event = unverified_event - ); - err - })?; - - // process the verified event - self.process_event(peer_id, verified_event)?; + let epoch_state = self.epoch_state.clone().unwrap(); + let quorum_store_enabled = self.quorum_store_enabled; + let buffer_manager_msg_tx = self.buffer_manager_msg_tx.clone(); + let round_manager_tx = self.round_manager_tx.clone(); + let my_peer_id = self.author; + self.bounded_executor + .spawn(async move { + match monitor!( + "verify_message", + unverified_event.clone().verify( + &epoch_state.verifier, + quorum_store_enabled, + peer_id == my_peer_id, + ) + ) { + Ok(verified_event) => { + Self::forward_event( + buffer_manager_msg_tx, + round_manager_tx, + peer_id, + verified_event, + ); + } + Err(e) => { + error!( + SecurityEvent::ConsensusInvalidMessage, + remote_peer = peer_id, + error = ?e, + unverified_event = unverified_event + ); + } + } + }) + .await; } Ok(()) } + fn forward_event_to( + mut maybe_tx: Option>, + key: K, + value: V, + ) -> anyhow::Result<()> { + if let Some(tx) = &mut maybe_tx { + tx.push(key, value) + } else { + bail!("channel not initialized"); + } + } + + fn forward_event( + buffer_manager_msg_tx: Option>, + round_manager_tx: Option< + aptos_channel::Sender<(Author, Discriminant), (Author, VerifiedEvent)>, + >, + peer_id: AccountAddress, + event: VerifiedEvent, + ) { + if let VerifiedEvent::ProposalMsg(proposal) = &event { + observe_block( + proposal.proposal().timestamp_usecs(), + BlockStage::EPOCH_MANAGER_VERIFIED, + ); + } + if let Err(e) = match event { + buffer_manager_event @ (VerifiedEvent::CommitVote(_) + | VerifiedEvent::CommitDecision(_)) => { + Self::forward_event_to(buffer_manager_msg_tx, peer_id, buffer_manager_event) + .context("buffer manager sender") + } + round_manager_event => Self::forward_event_to( + round_manager_tx, + (peer_id, discriminant(&round_manager_event)), + (peer_id, round_manager_event), + ) + .context("round manager sender"), + } { + warn!("Failed to forward event: {}", e); + } + } + async fn check_epoch( &mut self, peer_id: AccountAddress, @@ -836,33 +899,6 @@ impl EpochManager { Ok(None) } - fn process_event( - &mut self, - peer_id: AccountAddress, - event: VerifiedEvent, - ) -> anyhow::Result<()> { - if let VerifiedEvent::ProposalMsg(proposal) = &event { - observe_block( - proposal.proposal().timestamp_usecs(), - BlockStage::EPOCH_MANAGER_VERIFIED, - ); - } - match event { - buffer_manager_event @ (VerifiedEvent::CommitVote(_) - | VerifiedEvent::CommitDecision(_)) => { - if let Some(sender) = &mut self.buffer_manager_msg_tx { - sender.push(peer_id, buffer_manager_event)?; - } else { - bail!("Commit Phase not started but received Commit Message (CommitVote/CommitDecision)"); - } - } - round_manager_event => { - self.forward_to_round_manager(peer_id, round_manager_event); - } - } - Ok(()) - } - fn forward_to_round_manager(&mut self, peer_id: Author, event: VerifiedEvent) { let sender = self .round_manager_tx diff --git a/consensus/src/experimental/tests/buffer_manager_tests.rs b/consensus/src/experimental/tests/buffer_manager_tests.rs index bac3f50cb4c37..4cbe545487353 100644 --- a/consensus/src/experimental/tests/buffer_manager_tests.rs +++ b/consensus/src/experimental/tests/buffer_manager_tests.rs @@ -206,7 +206,7 @@ async fn loopback_commit_vote( let event: UnverifiedEvent = msg.into(); // verify the message and send the message into self loop msg_tx - .push(author, event.verify(verifier, false).unwrap()) + .push(author, event.verify(verifier, false, false).unwrap()) .ok(); } } diff --git a/consensus/src/network.rs b/consensus/src/network.rs index dafddcd4a1894..c891acba8fd7b 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -50,7 +50,7 @@ pub struct IncomingBlockRetrievalRequest { /// Just a convenience struct to keep all the network proxy receiving queues in one place. /// Will be returned by the NetworkTask upon startup. pub struct NetworkReceivers { - /// Provide a LIFO buffer for each (Author, MessageType) key + /// Provide a FIFO buffer for each (Author, MessageType) key pub consensus_messages: aptos_channel::Receiver< (AccountAddress, Discriminant), (AccountAddress, ConsensusMsg), @@ -287,11 +287,14 @@ impl NetworkTask { network_events: ConsensusNetworkEvents, self_receiver: aptos_channels::Receiver>, ) -> (NetworkTask, NetworkReceivers) { - let (consensus_messages_tx, consensus_messages) = - aptos_channel::new(QueueStyle::LIFO, 1, Some(&counters::CONSENSUS_CHANNEL_MSGS)); + let (consensus_messages_tx, consensus_messages) = aptos_channel::new( + QueueStyle::FIFO, + 20, + Some(&counters::CONSENSUS_CHANNEL_MSGS), + ); let (block_retrieval_tx, block_retrieval) = aptos_channel::new( - QueueStyle::LIFO, - 1, + QueueStyle::FIFO, + 10, Some(&counters::BLOCK_RETRIEVAL_CHANNEL_MSGS), ); let all_events = Box::new(select(network_events, self_receiver)); diff --git a/consensus/src/network_tests.rs b/consensus/src/network_tests.rs index 3c447c264b229..5672aa82ecf6e 100644 --- a/consensus/src/network_tests.rs +++ b/consensus/src/network_tests.rs @@ -103,6 +103,10 @@ impl NetworkPlayground { } } + pub fn handle(&self) -> Handle { + self.executor.clone() + } + /// HashMap of supported protocols to initialize ConsensusNetworkSender. pub fn peer_protocols(&self) -> Arc { self.peer_metadata_storage.clone() diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index 28f0ef136f984..87176d4bb7707 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -71,24 +71,33 @@ impl UnverifiedEvent { self, validator: &ValidatorVerifier, quorum_store_enabled: bool, + self_message: bool, ) -> Result { Ok(match self { UnverifiedEvent::ProposalMsg(p) => { - p.verify(validator, quorum_store_enabled)?; + if !self_message { + p.verify(validator, quorum_store_enabled)?; + } VerifiedEvent::ProposalMsg(p) } UnverifiedEvent::VoteMsg(v) => { - v.verify(validator)?; + if !self_message { + v.verify(validator)?; + } VerifiedEvent::VoteMsg(v) } // sync info verification is on-demand (verified when it's used) UnverifiedEvent::SyncInfo(s) => VerifiedEvent::UnverifiedSyncInfo(s), UnverifiedEvent::CommitVote(cv) => { - cv.verify(validator)?; + if !self_message { + cv.verify(validator)?; + } VerifiedEvent::CommitVote(cv) } UnverifiedEvent::CommitDecision(cd) => { - cd.verify(validator)?; + if !self_message { + cd.verify(validator)?; + } VerifiedEvent::CommitDecision(cd) } }) diff --git a/consensus/src/twins/twins_node.rs b/consensus/src/twins/twins_node.rs index 4b32503a69375..8055a1a3d233e 100644 --- a/consensus/src/twins/twins_node.rs +++ b/consensus/src/twins/twins_node.rs @@ -12,6 +12,7 @@ use crate::{ test_utils::{MockStateComputer, MockStorage}, util::time_service::ClockTimeService, }; +use aptos_bounded_executor::BoundedExecutor; use aptos_channels::{self, aptos_channel, message_queues::QueueStyle}; use aptos_config::{ config::{NodeConfig, WaypointConfig}, @@ -135,6 +136,7 @@ impl SMRNode { aptos_channels::new(1_024, &counters::PENDING_ROUND_TIMEOUTS); let (self_sender, self_receiver) = aptos_channels::new(1_024, &counters::PENDING_SELF_MESSAGES); + let bounded_executor = BoundedExecutor::new(2, playground.handle()); let epoch_mgr = EpochManager::new( &config, @@ -146,6 +148,7 @@ impl SMRNode { state_computer.clone(), storage.clone(), reconfig_listener, + bounded_executor, ); let (network_task, network_receiver) = NetworkTask::new(network_events, self_receiver);