diff --git a/consensus/src/dag/adapter.rs b/consensus/src/dag/adapter.rs index 203502b0a4122d..b649eb1398c14d 100644 --- a/consensus/src/dag/adapter.rs +++ b/consensus/src/dag/adapter.rs @@ -33,9 +33,9 @@ use futures_channel::mpsc::UnboundedSender; use std::{collections::HashMap, sync::Arc}; #[async_trait] -pub trait Notifier: Send { +pub trait Notifier: Send + Sync { fn send_ordered_nodes( - &mut self, + &self, ordered_nodes: Vec>, failed_author: Vec<(Round, Author)>, ) -> anyhow::Result<()>; @@ -64,7 +64,7 @@ impl NotifierAdapter { #[async_trait] impl Notifier for NotifierAdapter { fn send_ordered_nodes( - &mut self, + &self, ordered_nodes: Vec>, failed_author: Vec<(Round, Author)>, ) -> anyhow::Result<()> { diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index a28c613e24c0c2..45fd5b6dd9d7e2 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -1,12 +1,13 @@ // Copyright © Aptos Foundation use super::{ + adapter::Notifier, anchor_election::RoundRobinAnchorElection, dag_driver::DagDriver, dag_fetcher::{DagFetcherService, FetchRequestHandler}, dag_handler::NetworkHandler, dag_network::TDAGNetworkSender, - dag_state_sync::DAG_WINDOW, + dag_state_sync::{DagStateSynchronizer, StateSyncTrigger, DAG_WINDOW}, dag_store::Dag, order_rule::OrderRule, rb_handler::NodeBroadcastHandler, @@ -14,120 +15,282 @@ use super::{ types::DAGMessage, }; use crate::{ - dag::adapter::NotifierAdapter, experimental::buffer_manager::OrderedBlocks, - network::IncomingDAGRequest, state_replication::PayloadClient, + dag::{adapter::NotifierAdapter, dag_fetcher::DagFetcher}, + experimental::buffer_manager::OrderedBlocks, + network::IncomingDAGRequest, + state_replication::{PayloadClient, StateComputer}, }; -use aptos_channels::{aptos_channel, message_queues::QueueStyle}; -use aptos_consensus_types::common::Author; +use aptos_channels::{ + aptos_channel::{self, Receiver}, + message_queues::QueueStyle, +}; +use aptos_consensus_types::common::{Author, Round}; +use aptos_crypto::HashValue; use aptos_infallible::RwLock; +use aptos_logger::error; use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; use aptos_types::{ - epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner, + aggregate_signature::AggregateSignature, + block_info::BlockInfo, + epoch_state::EpochState, + ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, + validator_signer::ValidatorSigner, +}; +use futures::{FutureExt, StreamExt}; +use futures_channel::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, }; -use futures::stream::{AbortHandle, Abortable}; use std::{sync::Arc, time::Duration}; +use tokio::{select, task::JoinHandle}; use tokio_retry::strategy::ExponentialBackoff; -pub fn bootstrap_dag( +struct DagBootstrapper { self_peer: Author, signer: ValidatorSigner, epoch_state: Arc, - latest_ledger_info: LedgerInfo, storage: Arc, rb_network_sender: Arc>, dag_network_sender: Arc, time_service: aptos_time_service::TimeService, payload_client: Arc, -) -> ( - AbortHandle, - AbortHandle, - aptos_channel::Sender, - futures_channel::mpsc::UnboundedReceiver, -) { - let validators = epoch_state.verifier.get_ordered_account_addresses(); - let current_round = latest_ledger_info.round(); + state_computer: Arc, +} - let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); - let adapter = Box::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone())); - let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); +impl DagBootstrapper { + fn new( + self_peer: Author, + signer: ValidatorSigner, + epoch_state: Arc, + storage: Arc, + rb_network_sender: Arc>, + dag_network_sender: Arc, + time_service: aptos_time_service::TimeService, + payload_client: Arc, + state_computer: Arc, + ) -> Self { + Self { + self_peer, + signer, + epoch_state, + storage, + rb_network_sender, + dag_network_sender, + time_service, + payload_client, + state_computer, + } + } - // A backoff policy that starts at 100ms and doubles each iteration. - let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50); - let rb = Arc::new(ReliableBroadcast::new( - validators.clone(), - rb_network_sender, - rb_backoff_policy, - time_service.clone(), - // TODO: add to config - Duration::from_millis(500), - )); - - let dag = Arc::new(RwLock::new(Dag::new( - epoch_state.clone(), - storage.clone(), - current_round, - DAG_WINDOW, - ))); - - let anchor_election = Box::new(RoundRobinAnchorElection::new(validators)); - let order_rule = OrderRule::new( - epoch_state.clone(), - latest_ledger_info, - dag.clone(), - anchor_election, - adapter, - storage.clone(), - ); + fn bootstrap_dag_store( + &self, + initial_round: Round, + latest_ledger_info: LedgerInfo, + notifier: Arc, + ) -> (Arc>, OrderRule) { + let dag = Arc::new(RwLock::new(Dag::new( + self.epoch_state.clone(), + self.storage.clone(), + initial_round, + DAG_WINDOW, + ))); - let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = - DagFetcherService::new( - epoch_state.clone(), - dag_network_sender, + let validators = self.epoch_state.verifier.get_ordered_account_addresses(); + let anchor_election = Box::new(RoundRobinAnchorElection::new(validators)); + + let order_rule = OrderRule::new( + self.epoch_state.clone(), + latest_ledger_info, dag.clone(), - time_service.clone(), + anchor_election, + notifier, + self.storage.clone(), ); - let fetch_requester = Arc::new(fetch_requester); - let dag_driver = DagDriver::new( + (dag, order_rule) + } + + fn bootstrap_components( + &self, + dag: Arc>, + order_rule: OrderRule, + state_sync_trigger: StateSyncTrigger, + ) -> (NetworkHandler, DagFetcherService) { + let validators = self.epoch_state.verifier.get_ordered_account_addresses(); + + // A backoff policy that starts at 100ms and doubles each iteration. + let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50); + let rb = Arc::new(ReliableBroadcast::new( + validators.clone(), + self.rb_network_sender.clone(), + rb_backoff_policy, + self.time_service.clone(), + // TODO: add to config + Duration::from_millis(500), + )); + + let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = + DagFetcherService::new( + self.epoch_state.clone(), + self.dag_network_sender.clone(), + dag.clone(), + self.time_service.clone(), + ); + let fetch_requester = Arc::new(fetch_requester); + + let dag_driver = DagDriver::new( + self.self_peer, + self.epoch_state.clone(), + dag.clone(), + self.payload_client.clone(), + rb, + self.time_service.clone(), + self.storage.clone(), + order_rule, + fetch_requester.clone(), + ); + let rb_handler = NodeBroadcastHandler::new( + dag.clone(), + self.signer.clone(), + self.epoch_state.clone(), + self.storage.clone(), + fetch_requester, + ); + let fetch_handler = FetchRequestHandler::new(dag, self.epoch_state.clone()); + + let dag_handler = NetworkHandler::new( + self.epoch_state.clone(), + rb_handler, + dag_driver, + fetch_handler, + node_fetch_waiter, + certified_node_fetch_waiter, + state_sync_trigger, + ); + + (dag_handler, dag_fetcher) + } + + async fn bootstrapper( + self, + mut dag_rpc_rx: Receiver, + ordered_nodes_tx: UnboundedSender, + shutdown_rx: oneshot::Receiver<()>, + ) -> anyhow::Result<()> { + let adapter = Arc::new(NotifierAdapter::new( + ordered_nodes_tx, + self.storage.clone(), + )); + + let sync_manager = DagStateSynchronizer::new( + self.epoch_state.clone(), + adapter.clone(), + self.time_service.clone(), + self.state_computer.clone(), + self.storage.clone(), + ); + + // TODO: fetch the correct block info + let ledger_info = LedgerInfoWithSignatures::new( + LedgerInfo::new(BlockInfo::empty(), HashValue::zero()), + AggregateSignature::empty(), + ); + + let mut shutdown_rx = shutdown_rx.into_stream(); + + loop { + let (rebootstrap_notification_tx, mut rebootstrap_notification_rx) = + tokio::sync::mpsc::channel(1); + + // TODO: fix + let (dag_store, order_rule) = + self.bootstrap_dag_store(0, ledger_info.ledger_info().clone(), adapter.clone()); + + let state_sync_trigger = StateSyncTrigger::new( + dag_store.clone(), + adapter.clone(), + rebootstrap_notification_tx, + ); + + let (handler, fetch_service) = + self.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); + + let df_handle = tokio::spawn(fetch_service.start()); + + // poll the network handler while waiting for rebootstrap notification or shutdown notification + select! { + biased; + _ = shutdown_rx.select_next_some() => { + df_handle.abort(); + let _ = df_handle.await; + return Ok(()); + }, + Some(node) = rebootstrap_notification_rx.recv() => { + df_handle.abort(); + let _ = df_handle.await; + + let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone()); + + if let Err(e) = sync_manager.sync_dag_to(&node, dag_fetcher, dag_store.clone()).await { + error!(error = ?e, "unable to sync"); + } + }, + _ = handler.start(&mut dag_rpc_rx) => {} + } + } + } +} + +pub(super) fn bootstrap_dag_for_test( + self_peer: Author, + signer: ValidatorSigner, + epoch_state: Arc, + latest_ledger_info: LedgerInfo, + storage: Arc, + rb_network_sender: Arc>, + dag_network_sender: Arc, + time_service: aptos_time_service::TimeService, + payload_client: Arc, + state_computer: Arc, +) -> ( + JoinHandle<()>, + JoinHandle<()>, + aptos_channel::Sender, + UnboundedReceiver, +) { + let bootstraper = DagBootstrapper::new( self_peer, - epoch_state.clone(), - dag.clone(), - payload_client, - rb, - time_service, - storage.clone(), - order_rule, - fetch_requester.clone(), - ); - let rb_handler = NodeBroadcastHandler::new( - dag.clone(), signer, - epoch_state.clone(), + epoch_state, storage.clone(), - fetch_requester, + rb_network_sender, + dag_network_sender, + time_service, + payload_client, + state_computer, ); - let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone()); - let dag_handler = NetworkHandler::new( - epoch_state, - dag_rpc_rx, - rb_handler, - dag_driver, - fetch_handler, - node_fetch_waiter, - certified_node_fetch_waiter, + let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); + let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone())); + let (dag_rpc_tx, mut dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); + let (rebootstrap_notification_tx, _rebootstrap_notification_rx) = tokio::sync::mpsc::channel(1); + + let (dag_store, order_rule) = + bootstraper.bootstrap_dag_store(0, latest_ledger_info, adapter.clone()); + + let state_sync_trigger = StateSyncTrigger::new( + dag_store.clone(), + adapter.clone(), + rebootstrap_notification_tx, ); - let (nh_abort_handle, nh_abort_registration) = AbortHandle::new_pair(); - let (df_abort_handle, df_abort_registration) = AbortHandle::new_pair(); + let (handler, fetch_service) = + bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); - tokio::spawn(Abortable::new(dag_handler.start(), nh_abort_registration)); - tokio::spawn(Abortable::new(dag_fetcher.start(), df_abort_registration)); + let dh_handle = tokio::spawn(async move { + handler.start(&mut dag_rpc_rx).await + }); + let df_handle = tokio::spawn(fetch_service.start()); - ( - nh_abort_handle, - df_abort_handle, - dag_rpc_tx, - ordered_nodes_rx, - ) + (dh_handle, df_handle, dag_rpc_tx, ordered_nodes_rx) } diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 9d9406725eea04..84e32feb0d5795 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -22,7 +22,12 @@ use aptos_infallible::RwLock; use aptos_logger::error; use aptos_reliable_broadcast::ReliableBroadcast; use aptos_time_service::{TimeService, TimeServiceTrait}; -use aptos_types::{block_info::Round, epoch_state::EpochState}; +use aptos_types::{ + aggregate_signature::AggregateSignature, + block_info::{BlockInfo, Round}, + epoch_state::EpochState, + ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, +}; use futures::{ future::{AbortHandle, Abortable}, FutureExt, @@ -160,7 +165,6 @@ impl DagDriver { .broadcast(node.clone(), signature_builder) .then(move |certificate| { let certified_node = CertifiedNode::new(node, certificate.signatures().to_owned()); - let certified_node_msg = CertifiedNodeMessage::new(certified_node, latest_ledger_info); rb.broadcast(certified_node_msg, cert_ack_set) diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index 92841f1b36a8a2..3a9c35cbff8997 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -3,6 +3,7 @@ use super::{ dag_driver::DagDriver, dag_fetcher::{FetchRequestHandler, FetchWaiter}, + dag_state_sync::StateSyncTrigger, types::TDAGMessage, CertifiedNode, Node, }; @@ -23,40 +24,43 @@ use tokio::select; pub(crate) struct NetworkHandler { epoch_state: Arc, - dag_rpc_rx: aptos_channel::Receiver, node_receiver: NodeBroadcastHandler, dag_driver: DagDriver, fetch_receiver: FetchRequestHandler, node_fetch_waiter: FetchWaiter, certified_node_fetch_waiter: FetchWaiter, + state_sync_trigger: StateSyncTrigger, } impl NetworkHandler { - pub fn new( + pub(super) fn new( epoch_state: Arc, - dag_rpc_rx: aptos_channel::Receiver, node_receiver: NodeBroadcastHandler, dag_driver: DagDriver, fetch_receiver: FetchRequestHandler, node_fetch_waiter: FetchWaiter, certified_node_fetch_waiter: FetchWaiter, + state_sync_trigger: StateSyncTrigger, ) -> Self { Self { epoch_state, - dag_rpc_rx, node_receiver, dag_driver, fetch_receiver, node_fetch_waiter, certified_node_fetch_waiter, + state_sync_trigger, } } - pub async fn start(mut self) { + pub async fn start( + mut self, + dag_rpc_rx: &mut aptos_channel::Receiver, + ) { // TODO(ibalajiarun): clean up Reliable Broadcast storage periodically. loop { select! { - Some(msg) = self.dag_rpc_rx.next() => { + Some(msg) = dag_rpc_rx.next() => { if let Err(e) = self.process_rpc(msg).await { warn!(error = ?e, "error processing rpc"); } @@ -90,10 +94,15 @@ impl NetworkHandler { .verify(&self.epoch_state.verifier) .and_then(|_| self.node_receiver.process(node)) .map(|r| r.into()), - DAGMessage::CertifiedNodeMsg(node) => node - .verify(&self.epoch_state.verifier) - .and_then(|_| self.dag_driver.process(node.certified_node())) - .map(|r| r.into()), + DAGMessage::CertifiedNodeMsg(node) => match node.verify(&self.epoch_state.verifier) { + Ok(_) => { + let node = self.state_sync_trigger.check(node).await; + self.dag_driver + .process(node.certified_node()) + .map(|r| r.into()) + }, + Err(e) => Err(e), + }, DAGMessage::FetchRequest(request) => request .verify(&self.epoch_state.verifier) .and_then(|_| self.fetch_receiver.process(request)) @@ -117,5 +126,6 @@ impl NetworkHandler { .response_sender .send(response) .map_err(|_| anyhow::anyhow!("unable to respond to rpc")) + .into() } } diff --git a/consensus/src/dag/dag_state_sync.rs b/consensus/src/dag/dag_state_sync.rs index 82b270291ca604..a98e11136674d3 100644 --- a/consensus/src/dag/dag_state_sync.rs +++ b/consensus/src/dag/dag_state_sync.rs @@ -2,11 +2,10 @@ use super::{ adapter::Notifier, - dag_fetcher::{DagFetcher, TDagFetcher}, + dag_fetcher::TDagFetcher, dag_store::Dag, storage::DAGStorage, types::{CertifiedNodeMessage, RemoteFetchRequest}, - TDAGNetworkSender, }; use crate::state_replication::StateComputer; use aptos_consensus_types::common::Round; @@ -24,66 +23,64 @@ use std::sync::Arc; pub const DAG_WINDOW: usize = 1; pub const STATE_SYNC_WINDOW_MULTIPLIER: usize = 30; -pub(super) struct StateSyncManager { - epoch_state: Arc, - network: Arc, - notifier: Arc, - time_service: TimeService, - state_computer: Arc, - storage: Arc, +pub(super) struct StateSyncTrigger { dag_store: Arc>, + downstream_notifier: Arc, + bootstrap_notifier: tokio::sync::mpsc::Sender, } -impl StateSyncManager { - pub fn new( - epoch_state: Arc, - network: Arc, - notifier: Arc, - time_service: TimeService, - state_computer: Arc, - storage: Arc, +impl StateSyncTrigger { + pub(super) fn new( dag_store: Arc>, + downstream_notifier: Arc, + bootstrap_notifier: tokio::sync::mpsc::Sender, ) -> Self { Self { - epoch_state, - network, - notifier, - time_service, - state_computer, - storage, dag_store, + downstream_notifier, + bootstrap_notifier, } } - pub async fn sync_to( - &self, - node: &CertifiedNodeMessage, - ) -> anyhow::Result>>> { - self.sync_to_highest_commit_cert(node.ledger_info()).await; - self.try_sync_to_highest_ordered_anchor(node).await + /// This method checks if a state sync is required, and if so, + /// notifies the bootstraper and yields the current task infinitely, + /// to let the bootstraper can abort this task. + pub(super) async fn check(&self, node: CertifiedNodeMessage) -> CertifiedNodeMessage { + let ledger_info = node.ledger_info(); + + self.notify_commit_proof(ledger_info).await; + + if self.need_sync_for_ledger_info(ledger_info) { + self.bootstrap_notifier.send(node).await; + loop { + tokio::task::yield_now().await; + } + } + node } /// Fast forward in the decoupled-execution pipeline if the block exists there - pub async fn sync_to_highest_commit_cert(&self, ledger_info: &LedgerInfoWithSignatures) { - let send_commit_proof = { - let dag_reader = self.dag_store.read(); - dag_reader.highest_committed_anchor_round() < ledger_info.commit_info().round() - && dag_reader - .highest_ordered_anchor_round() - .unwrap_or_default() - >= ledger_info.commit_info().round() - }; - + async fn notify_commit_proof(&self, ledger_info: &LedgerInfoWithSignatures) { // if the anchor exists between ledger info round and highest ordered round // Note: ledger info round <= highest ordered round - if send_commit_proof { - self.notifier.send_commit_proof(ledger_info.clone()).await + if self.dag_store.read().highest_committed_anchor_round() + < ledger_info.commit_info().round() + && self + .dag_store + .read() + .highest_ordered_anchor_round() + .unwrap_or_default() + >= ledger_info.commit_info().round() + { + self.downstream_notifier + .send_commit_proof(ledger_info.clone()) + .await } } /// Check if we're far away from this ledger info and need to sync. /// This ensures that the block referred by the ledger info is not in buffer manager. - pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { + fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { let dag_reader = self.dag_store.read(); // check whether if DAG order round is behind the given ledger info round // (meaning consensus is behind) or @@ -97,34 +94,55 @@ impl StateSyncManager { + ((STATE_SYNC_WINDOW_MULTIPLIER * DAG_WINDOW) as Round) < li.commit_info().round() } +} - pub async fn try_sync_to_highest_ordered_anchor( - &self, - node: &CertifiedNodeMessage, - ) -> anyhow::Result>>> { - // Check whether to actually sync - let commit_li = node.ledger_info(); - if !self.need_sync_for_ledger_info(commit_li) { - return Ok(None); - } - - let dag_fetcher = Arc::new(DagFetcher::new( - self.epoch_state.clone(), - self.network.clone(), - self.time_service.clone(), - )); +pub(super) struct DagStateSynchronizer { + epoch_state: Arc, + notifier: Arc, + time_service: TimeService, + state_computer: Arc, + storage: Arc, +} - self.sync_to_highest_ordered_anchor(node, dag_fetcher).await +impl DagStateSynchronizer { + pub fn new( + epoch_state: Arc, + notifier: Arc, + time_service: TimeService, + state_computer: Arc, + storage: Arc, + ) -> Self { + Self { + epoch_state, + notifier, + time_service, + state_computer, + storage, + } } /// Note: Assumes that the sync checks have been done - pub async fn sync_to_highest_ordered_anchor( + pub async fn sync_dag_to( &self, node: &CertifiedNodeMessage, - dag_fetcher: Arc, - ) -> anyhow::Result>>> { + dag_fetcher: impl TDagFetcher, + current_dag_store: Arc>, + ) -> anyhow::Result<()> { let commit_li = node.ledger_info(); + { + let dag_reader = current_dag_store.read(); + assert!( + dag_reader + .highest_ordered_anchor_round() + .unwrap_or_default() + < commit_li.commit_info().round() + || dag_reader.highest_committed_anchor_round() + + ((STATE_SYNC_WINDOW_MULTIPLIER * DAG_WINDOW) as Round) + < commit_li.commit_info().round() + ); + } + if commit_li.ledger_info().ends_epoch() { self.notifier .send_epoch_change(EpochChangeProof::new( @@ -133,7 +151,7 @@ impl StateSyncManager { )) .await; // TODO: make sure to terminate DAG and yield to epoch manager - return Ok(None); + return Ok(()); } // TODO: there is a case where DAG fetches missing nodes in window and a crash happens and when we restart, @@ -179,6 +197,6 @@ impl StateSyncManager { // TODO: the caller should rebootstrap the order rule - Ok(Some(sync_dag_store)) + Ok(()) } } diff --git a/consensus/src/dag/order_rule.rs b/consensus/src/dag/order_rule.rs index 52687efd80cdef..b86ebaede12b33 100644 --- a/consensus/src/dag/order_rule.rs +++ b/consensus/src/dag/order_rule.rs @@ -20,7 +20,7 @@ pub struct OrderRule { lowest_unordered_anchor_round: Round, dag: Arc>, anchor_election: Box, - notifier: Box, + notifier: Arc, storage: Arc, } @@ -30,7 +30,7 @@ impl OrderRule { latest_ledger_info: LedgerInfo, dag: Arc>, anchor_election: Box, - notifier: Box, + notifier: Arc, storage: Arc, ) -> Self { // TODO: we need to initialize the anchor election based on the dag diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index cc0c554d074ca2..1a96ab14229f54 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -104,7 +104,7 @@ async fn test_certified_node_handler() { LedgerInfo::mock_genesis(None), dag.clone(), Box::new(RoundRobinAnchorElection::new(validators)), - Box::new(TestNotifier { tx }), + Arc::new(TestNotifier { tx }), storage.clone(), ); diff --git a/consensus/src/dag/tests/dag_state_sync_tests.rs b/consensus/src/dag/tests/dag_state_sync_tests.rs index 4644590ef78ce3..ec9b16db3d84df 100644 --- a/consensus/src/dag/tests/dag_state_sync_tests.rs +++ b/consensus/src/dag/tests/dag_state_sync_tests.rs @@ -4,7 +4,7 @@ use crate::{ dag::{ adapter::Notifier, dag_fetcher::{FetchRequestHandler, TDagFetcher}, - dag_state_sync::{StateSyncManager, DAG_WINDOW}, + dag_state_sync::DAG_WINDOW, dag_store::Dag, storage::DAGStorage, tests::{dag_test::MockStorage, helpers::generate_dag_nodes}, @@ -100,7 +100,7 @@ struct MockNotifier {} #[async_trait] impl Notifier for MockNotifier { fn send_ordered_nodes( - &mut self, + &self, _ordered_nodes: Vec>, _failed_author: Vec<(Round, Author)>, ) -> anyhow::Result<()> { diff --git a/consensus/src/dag/tests/integration_tests.rs b/consensus/src/dag/tests/integration_tests.rs index 5e61493b8f9966..b6cf7f03868ff4 100644 --- a/consensus/src/dag/tests/integration_tests.rs +++ b/consensus/src/dag/tests/integration_tests.rs @@ -2,12 +2,14 @@ use super::dag_test; use crate::{ - dag::bootstrap::bootstrap_dag, + dag::bootstrap::bootstrap_dag_for_test, experimental::buffer_manager::OrderedBlocks, network::{DAGNetworkSenderImpl, IncomingDAGRequest, NetworkSender}, network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC}, network_tests::{NetworkPlayground, TwinId}, - test_utils::{consensus_runtime, MockPayloadManager, MockStorage}, + test_utils::{ + consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStateComputer, MockStorage, + }, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::network_id::{NetworkId, PeerNetworkId}; @@ -23,6 +25,7 @@ use aptos_network::{ transport::ConnectionMetadata, ProtocolId, }; +use aptos_storage_interface::mock::MockDbReaderWriter; use aptos_time_service::TimeService; use aptos_types::{ epoch_state::EpochState, @@ -37,10 +40,11 @@ use futures::{ use futures_channel::mpsc::UnboundedReceiver; use maplit::hashmap; use std::sync::Arc; +use tokio::task::JoinHandle; struct DagBootstrapUnit { - nh_abort_handle: AbortHandle, - df_abort_handle: AbortHandle, + nh_task_handle: JoinHandle<()>, + df_task_handle: JoinHandle<()>, dag_rpc_tx: aptos_channel::Sender, network_events: Box, aptos_channels::Receiver>>>, @@ -70,22 +74,26 @@ impl DagBootstrapUnit { let payload_client = Arc::new(MockPayloadManager::new(None)); - let (nh_abort_handle, df_abort_handle, dag_rpc_tx, ordered_nodes_rx) = bootstrap_dag( - self_peer, - signer, - Arc::new(epoch_state), - storage.get_ledger_info(), - Arc::new(dag_storage), - network.clone(), - network.clone(), - time_service, - payload_client, - ); + let state_computer = Arc::new(EmptyStateComputer {}); + + let (nh_abort_handle, df_abort_handle, dag_rpc_tx, ordered_nodes_rx) = + bootstrap_dag_for_test( + self_peer, + signer, + Arc::new(epoch_state), + storage.get_ledger_info(), + Arc::new(dag_storage), + network.clone(), + network.clone(), + time_service, + payload_client, + state_computer, + ); ( Self { - nh_abort_handle, - df_abort_handle, + nh_task_handle: nh_abort_handle, + df_task_handle: df_abort_handle, dag_rpc_tx, network_events, }, diff --git a/consensus/src/dag/tests/order_rule_tests.rs b/consensus/src/dag/tests/order_rule_tests.rs index c403820cd89ca5..f597a8dc428ce7 100644 --- a/consensus/src/dag/tests/order_rule_tests.rs +++ b/consensus/src/dag/tests/order_rule_tests.rs @@ -87,7 +87,7 @@ pub struct TestNotifier { #[async_trait] impl Notifier for TestNotifier { fn send_ordered_nodes( - &mut self, + &self, ordered_nodes: Vec>, _failed_authors: Vec<(Round, Author)>, ) -> anyhow::Result<()> { @@ -118,7 +118,7 @@ fn create_order_rule( ledger_info, dag, anchor_election, - Box::new(TestNotifier { tx }), + Arc::new(TestNotifier { tx }), Arc::new(MockStorage::new()), ), rx, diff --git a/consensus/src/dag/types.rs b/consensus/src/dag/types.rs index ce49a214bc29d7..1c03dc64ba5740 100644 --- a/consensus/src/dag/types.rs +++ b/consensus/src/dag/types.rs @@ -431,13 +431,13 @@ impl CertifiedNodeMessage { } } - pub fn ledger_info(&self) -> &LedgerInfoWithSignatures { - &self.ledger_info - } - pub fn certified_node(self) -> CertifiedNode { self.inner } + + pub fn ledger_info(&self) -> &LedgerInfoWithSignatures { + &self.ledger_info + } } impl Deref for CertifiedNodeMessage {