diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 48abb90e69d97..6c3b984f9110e 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -12,7 +12,7 @@ use super::{ order_rule::OrderRule, rb_handler::NodeBroadcastHandler, storage::DAGStorage, - types::DAGMessage, + types::{CertifiedNodeMessage, DAGMessage}, }; use crate::{ dag::{adapter::NotifierAdapter, dag_fetcher::DagFetcher}, @@ -195,18 +195,10 @@ impl DagBootstrapper { let mut shutdown_rx = shutdown_rx.into_stream(); loop { - let (rebootstrap_notification_tx, mut rebootstrap_notification_rx) = - tokio::sync::mpsc::channel(1); - - // TODO: fix let (dag_store, order_rule) = self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone()); - let state_sync_trigger = StateSyncTrigger::new( - dag_store.clone(), - adapter.clone(), - rebootstrap_notification_tx, - ); + let state_sync_trigger = StateSyncTrigger::new(dag_store.clone(), adapter.clone()); let (handler, fetch_service) = self.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); @@ -221,17 +213,16 @@ impl DagBootstrapper { let _ = df_handle.await; return Ok(()); }, - Some(node) = rebootstrap_notification_rx.recv() => { + certified_node_msg = handler.run(&mut dag_rpc_rx) => { df_handle.abort(); let _ = df_handle.await; let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone()); - if let Err(e) = sync_manager.sync_dag_to(&node, dag_fetcher, dag_store.clone()).await { + if let Err(e) = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone()).await { error!(error = ?e, "unable to sync"); } - }, - _ = handler.start(&mut dag_rpc_rx) => {} + } } } } @@ -249,7 +240,7 @@ pub(super) fn bootstrap_dag_for_test( payload_client: Arc, state_computer: Arc, ) -> ( - JoinHandle<()>, + JoinHandle, JoinHandle<()>, aptos_channel::Sender, UnboundedReceiver, @@ -269,21 +260,16 @@ pub(super) fn bootstrap_dag_for_test( let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone())); let (dag_rpc_tx, mut dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); - let (rebootstrap_notification_tx, _rebootstrap_notification_rx) = tokio::sync::mpsc::channel(1); let (dag_store, order_rule) = bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone()); - let state_sync_trigger = StateSyncTrigger::new( - dag_store.clone(), - adapter.clone(), - rebootstrap_notification_tx, - ); + let state_sync_trigger = StateSyncTrigger::new(dag_store.clone(), adapter.clone()); let (handler, fetch_service) = bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); - let dh_handle = tokio::spawn(async move { handler.start(&mut dag_rpc_rx).await }); + let dh_handle = tokio::spawn(async move { handler.run(&mut dag_rpc_rx).await }); let df_handle = tokio::spawn(fetch_service.start()); (dh_handle, df_handle, dag_rpc_tx, ordered_nodes_rx) diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 8aa498f283ef8..7f8c4f535335b 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -22,10 +22,7 @@ use aptos_infallible::RwLock; use aptos_logger::error; use aptos_reliable_broadcast::ReliableBroadcast; use aptos_time_service::{TimeService, TimeServiceTrait}; -use aptos_types::{ - block_info::{Round}, - epoch_state::EpochState, -}; +use aptos_types::{block_info::Round, epoch_state::EpochState}; use futures::{ future::{AbortHandle, Abortable}, FutureExt, diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index ddb4620554188..d4405f813e137 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -3,8 +3,11 @@ use super::{ dag_driver::DagDriver, dag_fetcher::{FetchRequestHandler, FetchWaiter}, - dag_state_sync::StateSyncTrigger, - types::TDAGMessage, + dag_state_sync::{ + StateSyncStatus::{self, NeedsSync, Synced}, + StateSyncTrigger, + }, + types::{CertifiedNodeMessage, TDAGMessage}, CertifiedNode, Node, }; use crate::{ @@ -53,16 +56,23 @@ impl NetworkHandler { } } - pub async fn start( + pub async fn run( mut self, dag_rpc_rx: &mut aptos_channel::Receiver, - ) { + ) -> CertifiedNodeMessage { // TODO(ibalajiarun): clean up Reliable Broadcast storage periodically. loop { select! { Some(msg) = dag_rpc_rx.next() => { - if let Err(e) = self.process_rpc(msg).await { - warn!(error = ?e, "error processing rpc"); + match self.process_rpc(msg).await { + Ok(sync_status) => { + if let StateSyncStatus::NeedsSync(certified_node_msg) = sync_status { + return certified_node_msg; + } + }, + Err(e) => { + warn!(error = ?e, "error processing rpc"); + } } }, Some(res) = self.node_fetch_waiter.next() => { @@ -79,7 +89,10 @@ impl NetworkHandler { } } - async fn process_rpc(&mut self, rpc_request: IncomingDAGRequest) -> anyhow::Result<()> { + async fn process_rpc( + &mut self, + rpc_request: IncomingDAGRequest, + ) -> anyhow::Result { let dag_message: DAGMessage = rpc_request.req.try_into()?; let author = dag_message @@ -94,14 +107,18 @@ impl NetworkHandler { .verify(&self.epoch_state.verifier) .and_then(|_| self.node_receiver.process(node)) .map(|r| r.into()), - DAGMessage::CertifiedNodeMsg(node) => match node.verify(&self.epoch_state.verifier) { - Ok(_) => { - let node = self.state_sync_trigger.check(node).await - self.dag_driver - .process(node.certified_node()) - .map(|r| r.into()) - }, - Err(e) => Err(e), + DAGMessage::CertifiedNodeMsg(certified_node_msg) => { + match certified_node_msg.verify(&self.epoch_state.verifier) { + Ok(_) => match self.state_sync_trigger.check(certified_node_msg).await { + ret @ (NeedsSync(_), None) => return Ok(ret.0), + (Synced, Some(certified_node_msg)) => self + .dag_driver + .process(certified_node_msg.certified_node()) + .map(|r| r.into()), + _ => unreachable!(), + }, + Err(e) => Err(e), + } }, DAGMessage::FetchRequest(request) => request .verify(&self.epoch_state.verifier) @@ -126,6 +143,6 @@ impl NetworkHandler { .response_sender .send(response) .map_err(|_| anyhow::anyhow!("unable to respond to rpc")) - .into() + .map(|_| StateSyncStatus::Synced) } } diff --git a/consensus/src/dag/dag_state_sync.rs b/consensus/src/dag/dag_state_sync.rs index 554c9ef5e117b..32af4532e2c3a 100644 --- a/consensus/src/dag/dag_state_sync.rs +++ b/consensus/src/dag/dag_state_sync.rs @@ -23,40 +23,39 @@ use std::sync::Arc; pub const DAG_WINDOW: usize = 1; pub const STATE_SYNC_WINDOW_MULTIPLIER: usize = 30; +pub(super) enum StateSyncStatus { + NeedsSync(CertifiedNodeMessage), + Synced, +} + pub(super) struct StateSyncTrigger { dag_store: Arc>, downstream_notifier: Arc, - bootstrap_notifier: tokio::sync::mpsc::Sender, } impl StateSyncTrigger { - pub(super) fn new( - dag_store: Arc>, - downstream_notifier: Arc, - bootstrap_notifier: tokio::sync::mpsc::Sender, - ) -> Self { + pub(super) fn new(dag_store: Arc>, downstream_notifier: Arc) -> Self { Self { dag_store, downstream_notifier, - bootstrap_notifier, } } /// This method checks if a state sync is required, and if so, /// notifies the bootstraper and yields the current task infinitely, /// to let the bootstraper can abort this task. - pub(super) async fn check(&self, node: CertifiedNodeMessage) -> anyhow::Result { + pub(super) async fn check( + &self, + node: CertifiedNodeMessage, + ) -> (StateSyncStatus, Option) { let ledger_info = node.ledger_info(); self.notify_commit_proof(ledger_info).await; if self.need_sync_for_ledger_info(ledger_info) { - self.bootstrap_notifier.send(node).await?; - loop { - tokio::task::yield_now().await; - } + return (StateSyncStatus::NeedsSync(node), None); } - Ok(node) + (StateSyncStatus::Synced, Some(node)) } /// Fast forward in the decoupled-execution pipeline if the block exists there diff --git a/consensus/src/dag/tests/dag_state_sync_tests.rs b/consensus/src/dag/tests/dag_state_sync_tests.rs index 24317bc5ce59a..e47d654ec1b3a 100644 --- a/consensus/src/dag/tests/dag_state_sync_tests.rs +++ b/consensus/src/dag/tests/dag_state_sync_tests.rs @@ -200,8 +200,5 @@ async fn test_dag_state_sync() { assert_eq!(new_dag.lowest_round(), (LI_ROUNDS - DAG_WINDOW) as Round); assert_eq!(new_dag.highest_round(), (NUM_ROUNDS - 1) as Round); assert_none!(new_dag.highest_ordered_anchor_round(),); - assert_eq!( - new_dag.highest_committed_anchor_round(), - LI_ROUNDS as Round - ); + assert_eq!(new_dag.highest_committed_anchor_round(), LI_ROUNDS as Round); } diff --git a/consensus/src/dag/tests/integration_tests.rs b/consensus/src/dag/tests/integration_tests.rs index ee92a0cd8e76b..ace3e4d0941b2 100644 --- a/consensus/src/dag/tests/integration_tests.rs +++ b/consensus/src/dag/tests/integration_tests.rs @@ -2,14 +2,12 @@ use super::dag_test; use crate::{ - dag::bootstrap::bootstrap_dag_for_test, + dag::{bootstrap::bootstrap_dag_for_test, types::CertifiedNodeMessage}, experimental::buffer_manager::OrderedBlocks, network::{DAGNetworkSenderImpl, IncomingDAGRequest, NetworkSender}, network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC}, network_tests::{NetworkPlayground, TwinId}, - test_utils::{ - consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStorage, - }, + test_utils::{consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStorage}, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::network_id::{NetworkId, PeerNetworkId}; @@ -43,7 +41,7 @@ use std::sync::Arc; use tokio::task::JoinHandle; struct DagBootstrapUnit { - nh_task_handle: JoinHandle<()>, + nh_task_handle: JoinHandle, df_task_handle: JoinHandle<()>, dag_rpc_tx: aptos_channel::Sender, network_events: