diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 45fd5b6dd9d7e..48abb90e69d97 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -24,7 +24,7 @@ use aptos_channels::{ aptos_channel::{self, Receiver}, message_queues::QueueStyle, }; -use aptos_consensus_types::common::{Author, Round}; +use aptos_consensus_types::common::Author; use aptos_crypto::HashValue; use aptos_infallible::RwLock; use aptos_logger::error; @@ -84,14 +84,13 @@ impl DagBootstrapper { fn bootstrap_dag_store( &self, - initial_round: Round, latest_ledger_info: LedgerInfo, notifier: Arc, ) -> (Arc>, OrderRule) { let dag = Arc::new(RwLock::new(Dag::new( self.epoch_state.clone(), self.storage.clone(), - initial_round, + latest_ledger_info.round(), DAG_WINDOW, ))); @@ -177,10 +176,7 @@ impl DagBootstrapper { ordered_nodes_tx: UnboundedSender, shutdown_rx: oneshot::Receiver<()>, ) -> anyhow::Result<()> { - let adapter = Arc::new(NotifierAdapter::new( - ordered_nodes_tx, - self.storage.clone(), - )); + let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, self.storage.clone())); let sync_manager = DagStateSynchronizer::new( self.epoch_state.clone(), @@ -204,7 +200,7 @@ impl DagBootstrapper { // TODO: fix let (dag_store, order_rule) = - self.bootstrap_dag_store(0, ledger_info.ledger_info().clone(), adapter.clone()); + self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone()); let state_sync_trigger = StateSyncTrigger::new( dag_store.clone(), @@ -276,7 +272,7 @@ pub(super) fn bootstrap_dag_for_test( let (rebootstrap_notification_tx, _rebootstrap_notification_rx) = tokio::sync::mpsc::channel(1); let (dag_store, order_rule) = - bootstraper.bootstrap_dag_store(0, latest_ledger_info, adapter.clone()); + bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone()); let state_sync_trigger = StateSyncTrigger::new( dag_store.clone(), @@ -287,9 +283,7 @@ pub(super) fn bootstrap_dag_for_test( let (handler, fetch_service) = bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); - let dh_handle = tokio::spawn(async move { - handler.start(&mut dag_rpc_rx).await - }); + let dh_handle = tokio::spawn(async move { handler.start(&mut dag_rpc_rx).await }); let df_handle = tokio::spawn(fetch_service.start()); (dh_handle, df_handle, dag_rpc_tx, ordered_nodes_rx) diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 84e32feb0d579..8aa498f283ef8 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -23,10 +23,8 @@ use aptos_logger::error; use aptos_reliable_broadcast::ReliableBroadcast; use aptos_time_service::{TimeService, TimeServiceTrait}; use aptos_types::{ - aggregate_signature::AggregateSignature, - block_info::{BlockInfo, Round}, + block_info::{Round}, epoch_state::EpochState, - ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, }; use futures::{ future::{AbortHandle, Abortable}, diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index 3a9c35cbff899..ddb4620554188 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -96,7 +96,7 @@ impl NetworkHandler { .map(|r| r.into()), DAGMessage::CertifiedNodeMsg(node) => match node.verify(&self.epoch_state.verifier) { Ok(_) => { - let node = self.state_sync_trigger.check(node).await; + let node = self.state_sync_trigger.check(node).await self.dag_driver .process(node.certified_node()) .map(|r| r.into()) diff --git a/consensus/src/dag/dag_state_sync.rs b/consensus/src/dag/dag_state_sync.rs index a98e11136674d..554c9ef5e117b 100644 --- a/consensus/src/dag/dag_state_sync.rs +++ b/consensus/src/dag/dag_state_sync.rs @@ -45,18 +45,18 @@ impl StateSyncTrigger { /// This method checks if a state sync is required, and if so, /// notifies the bootstraper and yields the current task infinitely, /// to let the bootstraper can abort this task. - pub(super) async fn check(&self, node: CertifiedNodeMessage) -> CertifiedNodeMessage { + pub(super) async fn check(&self, node: CertifiedNodeMessage) -> anyhow::Result { let ledger_info = node.ledger_info(); self.notify_commit_proof(ledger_info).await; if self.need_sync_for_ledger_info(ledger_info) { - self.bootstrap_notifier.send(node).await; + self.bootstrap_notifier.send(node).await?; loop { tokio::task::yield_now().await; } } - node + Ok(node) } /// Fast forward in the decoupled-execution pipeline if the block exists there @@ -127,7 +127,7 @@ impl DagStateSynchronizer { node: &CertifiedNodeMessage, dag_fetcher: impl TDagFetcher, current_dag_store: Arc>, - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { let commit_li = node.ledger_info(); { @@ -151,7 +151,7 @@ impl DagStateSynchronizer { )) .await; // TODO: make sure to terminate DAG and yield to epoch manager - return Ok(()); + return Ok(None); } // TODO: there is a case where DAG fetches missing nodes in window and a crash happens and when we restart, @@ -197,6 +197,6 @@ impl DagStateSynchronizer { // TODO: the caller should rebootstrap the order rule - Ok(()) + Ok(Arc::into_inner(sync_dag_store).map(|r| r.into_inner())) } } diff --git a/consensus/src/dag/tests/dag_state_sync_tests.rs b/consensus/src/dag/tests/dag_state_sync_tests.rs index ec9b16db3d84d..24317bc5ce59a 100644 --- a/consensus/src/dag/tests/dag_state_sync_tests.rs +++ b/consensus/src/dag/tests/dag_state_sync_tests.rs @@ -4,7 +4,7 @@ use crate::{ dag::{ adapter::Notifier, dag_fetcher::{FetchRequestHandler, TDagFetcher}, - dag_state_sync::DAG_WINDOW, + dag_state_sync::{DagStateSynchronizer, DAG_WINDOW}, dag_store::Dag, storage::DAGStorage, tests::{dag_test::MockStorage, helpers::generate_dag_nodes}, @@ -112,24 +112,17 @@ impl Notifier for MockNotifier { async fn send_commit_proof(&self, _ledger_info: LedgerInfoWithSignatures) {} } -fn setup( - epoch_state: Arc, - dag_store: Arc>, - storage: Arc, -) -> StateSyncManager { - let network = Arc::new(MockDAGNetworkSender {}); +fn setup(epoch_state: Arc, storage: Arc) -> DagStateSynchronizer { let time_service = TimeService::mock(); let state_computer = Arc::new(EmptyStateComputer {}); - let upstream_notifier = Arc::new(MockNotifier {}); + let downstream_notifier = Arc::new(MockNotifier {}); - StateSyncManager::new( + DagStateSynchronizer::new( epoch_state, - network, - upstream_notifier, + downstream_notifier, time_service, state_computer, storage, - dag_store, ) } @@ -193,24 +186,22 @@ async fn test_dag_state_sync() { let sync_node_li = CertifiedNodeMessage::new(sync_to_node, sync_to_li); - let state_sync = setup(epoch_state.clone(), slow_dag.clone(), storage.clone()); - let dag_fetcher = Arc::new(MockDagFetcher { + let state_sync = setup(epoch_state.clone(), storage.clone()); + let dag_fetcher = MockDagFetcher { target_dag: fast_dag.clone(), epoch_state: epoch_state.clone(), - }); + }; let sync_result = state_sync - .sync_to_highest_ordered_anchor(&sync_node_li, dag_fetcher) + .sync_dag_to(&sync_node_li, dag_fetcher, slow_dag.clone()) .await; let new_dag = sync_result.unwrap().unwrap(); - let dag_reader = new_dag.read(); - - assert_eq!(dag_reader.lowest_round(), (LI_ROUNDS - DAG_WINDOW) as Round); - assert_eq!(dag_reader.highest_round(), (NUM_ROUNDS - 1) as Round); - assert_none!(dag_reader.highest_ordered_anchor_round(),); + assert_eq!(new_dag.lowest_round(), (LI_ROUNDS - DAG_WINDOW) as Round); + assert_eq!(new_dag.highest_round(), (NUM_ROUNDS - 1) as Round); + assert_none!(new_dag.highest_ordered_anchor_round(),); assert_eq!( - dag_reader.highest_committed_anchor_round(), + new_dag.highest_committed_anchor_round(), LI_ROUNDS as Round ); } diff --git a/consensus/src/dag/tests/integration_tests.rs b/consensus/src/dag/tests/integration_tests.rs index a812800e213a2..ee92a0cd8e76b 100644 --- a/consensus/src/dag/tests/integration_tests.rs +++ b/consensus/src/dag/tests/integration_tests.rs @@ -8,7 +8,7 @@ use crate::{ network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC}, network_tests::{NetworkPlayground, TwinId}, test_utils::{ - consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStateComputer, MockStorage, + consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStorage, }, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; @@ -25,7 +25,6 @@ use aptos_network::{ transport::ConnectionMetadata, ProtocolId, }; -use aptos_storage_interface::mock::MockDbReaderWriter; use aptos_time_service::TimeService; use aptos_types::{ epoch_state::EpochState, @@ -35,7 +34,7 @@ use aptos_types::{ }; use claims::assert_gt; use futures::{ - stream::{select, AbortHandle, Select}, + stream::{select, Select}, StreamExt, }; use futures_channel::mpsc::UnboundedReceiver;