diff --git a/consensus/src/dag/adapter.rs b/consensus/src/dag/adapter.rs index 6e3091fd3421d..6c7846124bf91 100644 --- a/consensus/src/dag/adapter.rs +++ b/consensus/src/dag/adapter.rs @@ -31,9 +31,9 @@ use futures_channel::mpsc::UnboundedSender; use std::{collections::HashMap, sync::Arc}; #[async_trait] -pub trait Notifier: Send { +pub trait Notifier: Send + Sync { fn send_ordered_nodes( - &mut self, + &self, ordered_nodes: Vec>, failed_author: Vec<(Round, Author)>, ) -> anyhow::Result<()>; @@ -62,7 +62,7 @@ impl NotifierAdapter { #[async_trait] impl Notifier for NotifierAdapter { fn send_ordered_nodes( - &mut self, + &self, ordered_nodes: Vec>, failed_author: Vec<(Round, Author)>, ) -> anyhow::Result<()> { diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index a28c613e24c0c..0e152a697258e 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -1,133 +1,273 @@ // Copyright © Aptos Foundation use super::{ + adapter::Notifier, anchor_election::RoundRobinAnchorElection, dag_driver::DagDriver, dag_fetcher::{DagFetcherService, FetchRequestHandler}, dag_handler::NetworkHandler, dag_network::TDAGNetworkSender, - dag_state_sync::DAG_WINDOW, + dag_state_sync::{DagStateSynchronizer, StateSyncTrigger, DAG_WINDOW}, dag_store::Dag, order_rule::OrderRule, rb_handler::NodeBroadcastHandler, storage::DAGStorage, - types::DAGMessage, + types::{CertifiedNodeMessage, DAGMessage}, }; use crate::{ - dag::adapter::NotifierAdapter, experimental::buffer_manager::OrderedBlocks, - network::IncomingDAGRequest, state_replication::PayloadClient, + dag::{adapter::NotifierAdapter, dag_fetcher::DagFetcher}, + experimental::buffer_manager::OrderedBlocks, + network::IncomingDAGRequest, + state_replication::{PayloadClient, StateComputer}, +}; +use aptos_channels::{ + aptos_channel::{self, Receiver}, + message_queues::QueueStyle, }; -use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_consensus_types::common::Author; +use aptos_crypto::HashValue; use aptos_infallible::RwLock; +use aptos_logger::error; use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; use aptos_types::{ - epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner, + aggregate_signature::AggregateSignature, + block_info::BlockInfo, + epoch_state::EpochState, + ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, + validator_signer::ValidatorSigner, +}; +use futures_channel::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, }; -use futures::stream::{AbortHandle, Abortable}; use std::{sync::Arc, time::Duration}; +use tokio::{select, task::JoinHandle}; use tokio_retry::strategy::ExponentialBackoff; -pub fn bootstrap_dag( +struct DagBootstrapper { self_peer: Author, - signer: ValidatorSigner, + signer: Arc, epoch_state: Arc, - latest_ledger_info: LedgerInfo, storage: Arc, rb_network_sender: Arc>, dag_network_sender: Arc, time_service: aptos_time_service::TimeService, payload_client: Arc, -) -> ( - AbortHandle, - AbortHandle, - aptos_channel::Sender, - futures_channel::mpsc::UnboundedReceiver, -) { - let validators = epoch_state.verifier.get_ordered_account_addresses(); - let current_round = latest_ledger_info.round(); + state_computer: Arc, +} - let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); - let adapter = Box::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone())); - let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); +impl DagBootstrapper { + fn new( + self_peer: Author, + signer: Arc, + epoch_state: Arc, + storage: Arc, + rb_network_sender: Arc>, + dag_network_sender: Arc, + time_service: aptos_time_service::TimeService, + payload_client: Arc, + state_computer: Arc, + ) -> Self { + Self { + self_peer, + signer, + epoch_state, + storage, + rb_network_sender, + dag_network_sender, + time_service, + payload_client, + state_computer, + } + } - // A backoff policy that starts at 100ms and doubles each iteration. - let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50); - let rb = Arc::new(ReliableBroadcast::new( - validators.clone(), - rb_network_sender, - rb_backoff_policy, - time_service.clone(), - // TODO: add to config - Duration::from_millis(500), - )); - - let dag = Arc::new(RwLock::new(Dag::new( - epoch_state.clone(), - storage.clone(), - current_round, - DAG_WINDOW, - ))); - - let anchor_election = Box::new(RoundRobinAnchorElection::new(validators)); - let order_rule = OrderRule::new( - epoch_state.clone(), - latest_ledger_info, - dag.clone(), - anchor_election, - adapter, - storage.clone(), - ); + fn bootstrap_dag_store( + &self, + latest_ledger_info: LedgerInfo, + notifier: Arc, + ) -> (Arc>, OrderRule) { + let dag = Arc::new(RwLock::new(Dag::new( + self.epoch_state.clone(), + self.storage.clone(), + latest_ledger_info.round(), + DAG_WINDOW, + ))); - let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = - DagFetcherService::new( - epoch_state.clone(), - dag_network_sender, + let validators = self.epoch_state.verifier.get_ordered_account_addresses(); + let anchor_election = Box::new(RoundRobinAnchorElection::new(validators)); + + let order_rule = OrderRule::new( + self.epoch_state.clone(), + latest_ledger_info, dag.clone(), - time_service.clone(), + anchor_election, + notifier, + self.storage.clone(), + ); + + (dag, order_rule) + } + + fn bootstrap_components( + &self, + dag: Arc>, + order_rule: OrderRule, + state_sync_trigger: StateSyncTrigger, + ) -> (NetworkHandler, DagFetcherService) { + let validators = self.epoch_state.verifier.get_ordered_account_addresses(); + + // A backoff policy that starts at 100ms and doubles each iteration. + let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50); + let rb = Arc::new(ReliableBroadcast::new( + validators.clone(), + self.rb_network_sender.clone(), + rb_backoff_policy, + self.time_service.clone(), + // TODO: add to config + Duration::from_millis(500), + )); + + let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = + DagFetcherService::new( + self.epoch_state.clone(), + self.dag_network_sender.clone(), + dag.clone(), + self.time_service.clone(), + ); + let fetch_requester = Arc::new(fetch_requester); + + let dag_driver = DagDriver::new( + self.self_peer, + self.epoch_state.clone(), + dag.clone(), + self.payload_client.clone(), + rb, + self.time_service.clone(), + self.storage.clone(), + order_rule, + fetch_requester.clone(), + ); + let rb_handler = NodeBroadcastHandler::new( + dag.clone(), + self.signer.clone(), + self.epoch_state.clone(), + self.storage.clone(), + fetch_requester, + ); + let fetch_handler = FetchRequestHandler::new(dag, self.epoch_state.clone()); + + let dag_handler = NetworkHandler::new( + self.epoch_state.clone(), + rb_handler, + dag_driver, + fetch_handler, + node_fetch_waiter, + certified_node_fetch_waiter, + state_sync_trigger, + ); + + (dag_handler, dag_fetcher) + } + + async fn bootstrapper( + self, + mut dag_rpc_rx: Receiver, + ordered_nodes_tx: UnboundedSender, + mut shutdown_rx: oneshot::Receiver<()>, + ) { + let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, self.storage.clone())); + + let sync_manager = DagStateSynchronizer::new( + self.epoch_state.clone(), + adapter.clone(), + self.time_service.clone(), + self.state_computer.clone(), + self.storage.clone(), + ); + + // TODO: fetch the correct block info + let ledger_info = LedgerInfoWithSignatures::new( + LedgerInfo::new(BlockInfo::empty(), HashValue::zero()), + AggregateSignature::empty(), ); - let fetch_requester = Arc::new(fetch_requester); - let dag_driver = DagDriver::new( + loop { + let (dag_store, order_rule) = + self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone()); + + let state_sync_trigger = StateSyncTrigger::new(dag_store.clone(), adapter.clone()); + + let (handler, fetch_service) = + self.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); + + let df_handle = tokio::spawn(fetch_service.start()); + + // poll the network handler while waiting for rebootstrap notification or shutdown notification + select! { + biased; + _ = &mut shutdown_rx => { + df_handle.abort(); + let _ = df_handle.await; + return; + }, + certified_node_msg = handler.run(&mut dag_rpc_rx) => { + df_handle.abort(); + let _ = df_handle.await; + + let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone()); + + if let Err(e) = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone()).await { + error!(error = ?e, "unable to sync"); + } + } + } + } + } +} + +pub(super) fn bootstrap_dag_for_test( + self_peer: Author, + signer: ValidatorSigner, + epoch_state: Arc, + latest_ledger_info: LedgerInfo, + storage: Arc, + rb_network_sender: Arc>, + dag_network_sender: Arc, + time_service: aptos_time_service::TimeService, + payload_client: Arc, + state_computer: Arc, +) -> ( + JoinHandle, + JoinHandle<()>, + aptos_channel::Sender, + UnboundedReceiver, +) { + let bootstraper = DagBootstrapper::new( self_peer, - epoch_state.clone(), - dag.clone(), - payload_client, - rb, - time_service, - storage.clone(), - order_rule, - fetch_requester.clone(), - ); - let rb_handler = NodeBroadcastHandler::new( - dag.clone(), - signer, - epoch_state.clone(), + signer.into(), + epoch_state, storage.clone(), - fetch_requester, + rb_network_sender, + dag_network_sender, + time_service, + payload_client, + state_computer, ); - let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone()); - let dag_handler = NetworkHandler::new( - epoch_state, - dag_rpc_rx, - rb_handler, - dag_driver, - fetch_handler, - node_fetch_waiter, - certified_node_fetch_waiter, - ); + let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); + let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone())); + let (dag_rpc_tx, mut dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); + + let (dag_store, order_rule) = + bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone()); + + let state_sync_trigger = StateSyncTrigger::new(dag_store.clone(), adapter.clone()); - let (nh_abort_handle, nh_abort_registration) = AbortHandle::new_pair(); - let (df_abort_handle, df_abort_registration) = AbortHandle::new_pair(); + let (handler, fetch_service) = + bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); - tokio::spawn(Abortable::new(dag_handler.start(), nh_abort_registration)); - tokio::spawn(Abortable::new(dag_fetcher.start(), df_abort_registration)); + let dh_handle = tokio::spawn(async move { handler.run(&mut dag_rpc_rx).await }); + let df_handle = tokio::spawn(fetch_service.start()); - ( - nh_abort_handle, - df_abort_handle, - dag_rpc_tx, - ordered_nodes_rx, - ) + (dh_handle, df_handle, dag_rpc_tx, ordered_nodes_rx) } diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 9d9406725eea0..7f8c4f535335b 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -160,7 +160,6 @@ impl DagDriver { .broadcast(node.clone(), signature_builder) .then(move |certificate| { let certified_node = CertifiedNode::new(node, certificate.signatures().to_owned()); - let certified_node_msg = CertifiedNodeMessage::new(certified_node, latest_ledger_info); rb.broadcast(certified_node_msg, cert_ack_set) diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index 0b23e45a796ba..d4405f813e137 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -3,7 +3,11 @@ use super::{ dag_driver::DagDriver, dag_fetcher::{FetchRequestHandler, FetchWaiter}, - types::TDAGMessage, + dag_state_sync::{ + StateSyncStatus::{self, NeedsSync, Synced}, + StateSyncTrigger, + }, + types::{CertifiedNodeMessage, TDAGMessage}, CertifiedNode, Node, }; use crate::{ @@ -23,42 +27,52 @@ use tokio::select; pub(crate) struct NetworkHandler { epoch_state: Arc, - dag_rpc_rx: aptos_channel::Receiver, node_receiver: NodeBroadcastHandler, dag_driver: DagDriver, fetch_receiver: FetchRequestHandler, node_fetch_waiter: FetchWaiter, certified_node_fetch_waiter: FetchWaiter, + state_sync_trigger: StateSyncTrigger, } impl NetworkHandler { - pub fn new( + pub(super) fn new( epoch_state: Arc, - dag_rpc_rx: aptos_channel::Receiver, node_receiver: NodeBroadcastHandler, dag_driver: DagDriver, fetch_receiver: FetchRequestHandler, node_fetch_waiter: FetchWaiter, certified_node_fetch_waiter: FetchWaiter, + state_sync_trigger: StateSyncTrigger, ) -> Self { Self { epoch_state, - dag_rpc_rx, node_receiver, dag_driver, fetch_receiver, node_fetch_waiter, certified_node_fetch_waiter, + state_sync_trigger, } } - pub async fn start(mut self) { + pub async fn run( + mut self, + dag_rpc_rx: &mut aptos_channel::Receiver, + ) -> CertifiedNodeMessage { // TODO(ibalajiarun): clean up Reliable Broadcast storage periodically. loop { select! { - Some(msg) = self.dag_rpc_rx.next() => { - if let Err(e) = self.process_rpc(msg).await { - warn!(error = ?e, "error processing rpc"); + Some(msg) = dag_rpc_rx.next() => { + match self.process_rpc(msg).await { + Ok(sync_status) => { + if let StateSyncStatus::NeedsSync(certified_node_msg) = sync_status { + return certified_node_msg; + } + }, + Err(e) => { + warn!(error = ?e, "error processing rpc"); + } } }, Some(res) = self.node_fetch_waiter.next() => { @@ -75,7 +89,10 @@ impl NetworkHandler { } } - async fn process_rpc(&mut self, rpc_request: IncomingDAGRequest) -> anyhow::Result<()> { + async fn process_rpc( + &mut self, + rpc_request: IncomingDAGRequest, + ) -> anyhow::Result { let dag_message: DAGMessage = rpc_request.req.try_into()?; let author = dag_message @@ -90,10 +107,19 @@ impl NetworkHandler { .verify(&self.epoch_state.verifier) .and_then(|_| self.node_receiver.process(node)) .map(|r| r.into()), - DAGMessage::CertifiedNodeMsg(certified_node_msg) => certified_node_msg - .verify(&self.epoch_state.verifier) - .and_then(|_| self.dag_driver.process(certified_node_msg.certified_node())) - .map(|r| r.into()), + DAGMessage::CertifiedNodeMsg(certified_node_msg) => { + match certified_node_msg.verify(&self.epoch_state.verifier) { + Ok(_) => match self.state_sync_trigger.check(certified_node_msg).await { + ret @ (NeedsSync(_), None) => return Ok(ret.0), + (Synced, Some(certified_node_msg)) => self + .dag_driver + .process(certified_node_msg.certified_node()) + .map(|r| r.into()), + _ => unreachable!(), + }, + Err(e) => Err(e), + } + }, DAGMessage::FetchRequest(request) => request .verify(&self.epoch_state.verifier) .and_then(|_| self.fetch_receiver.process(request)) @@ -117,5 +143,6 @@ impl NetworkHandler { .response_sender .send(response) .map_err(|_| anyhow::anyhow!("unable to respond to rpc")) + .map(|_| StateSyncStatus::Synced) } } diff --git a/consensus/src/dag/dag_state_sync.rs b/consensus/src/dag/dag_state_sync.rs index 82b270291ca60..32af4532e2c3a 100644 --- a/consensus/src/dag/dag_state_sync.rs +++ b/consensus/src/dag/dag_state_sync.rs @@ -2,11 +2,10 @@ use super::{ adapter::Notifier, - dag_fetcher::{DagFetcher, TDagFetcher}, + dag_fetcher::TDagFetcher, dag_store::Dag, storage::DAGStorage, types::{CertifiedNodeMessage, RemoteFetchRequest}, - TDAGNetworkSender, }; use crate::state_replication::StateComputer; use aptos_consensus_types::common::Round; @@ -24,66 +23,63 @@ use std::sync::Arc; pub const DAG_WINDOW: usize = 1; pub const STATE_SYNC_WINDOW_MULTIPLIER: usize = 30; -pub(super) struct StateSyncManager { - epoch_state: Arc, - network: Arc, - notifier: Arc, - time_service: TimeService, - state_computer: Arc, - storage: Arc, +pub(super) enum StateSyncStatus { + NeedsSync(CertifiedNodeMessage), + Synced, +} + +pub(super) struct StateSyncTrigger { dag_store: Arc>, + downstream_notifier: Arc, } -impl StateSyncManager { - pub fn new( - epoch_state: Arc, - network: Arc, - notifier: Arc, - time_service: TimeService, - state_computer: Arc, - storage: Arc, - dag_store: Arc>, - ) -> Self { +impl StateSyncTrigger { + pub(super) fn new(dag_store: Arc>, downstream_notifier: Arc) -> Self { Self { - epoch_state, - network, - notifier, - time_service, - state_computer, - storage, dag_store, + downstream_notifier, } } - pub async fn sync_to( + /// This method checks if a state sync is required, and if so, + /// notifies the bootstraper and yields the current task infinitely, + /// to let the bootstraper can abort this task. + pub(super) async fn check( &self, - node: &CertifiedNodeMessage, - ) -> anyhow::Result>>> { - self.sync_to_highest_commit_cert(node.ledger_info()).await; - self.try_sync_to_highest_ordered_anchor(node).await + node: CertifiedNodeMessage, + ) -> (StateSyncStatus, Option) { + let ledger_info = node.ledger_info(); + + self.notify_commit_proof(ledger_info).await; + + if self.need_sync_for_ledger_info(ledger_info) { + return (StateSyncStatus::NeedsSync(node), None); + } + (StateSyncStatus::Synced, Some(node)) } /// Fast forward in the decoupled-execution pipeline if the block exists there - pub async fn sync_to_highest_commit_cert(&self, ledger_info: &LedgerInfoWithSignatures) { - let send_commit_proof = { - let dag_reader = self.dag_store.read(); - dag_reader.highest_committed_anchor_round() < ledger_info.commit_info().round() - && dag_reader - .highest_ordered_anchor_round() - .unwrap_or_default() - >= ledger_info.commit_info().round() - }; - + async fn notify_commit_proof(&self, ledger_info: &LedgerInfoWithSignatures) { // if the anchor exists between ledger info round and highest ordered round // Note: ledger info round <= highest ordered round - if send_commit_proof { - self.notifier.send_commit_proof(ledger_info.clone()).await + if self.dag_store.read().highest_committed_anchor_round() + < ledger_info.commit_info().round() + && self + .dag_store + .read() + .highest_ordered_anchor_round() + .unwrap_or_default() + >= ledger_info.commit_info().round() + { + self.downstream_notifier + .send_commit_proof(ledger_info.clone()) + .await } } /// Check if we're far away from this ledger info and need to sync. /// This ensures that the block referred by the ledger info is not in buffer manager. - pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { + fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { let dag_reader = self.dag_store.read(); // check whether if DAG order round is behind the given ledger info round // (meaning consensus is behind) or @@ -97,34 +93,55 @@ impl StateSyncManager { + ((STATE_SYNC_WINDOW_MULTIPLIER * DAG_WINDOW) as Round) < li.commit_info().round() } +} - pub async fn try_sync_to_highest_ordered_anchor( - &self, - node: &CertifiedNodeMessage, - ) -> anyhow::Result>>> { - // Check whether to actually sync - let commit_li = node.ledger_info(); - if !self.need_sync_for_ledger_info(commit_li) { - return Ok(None); - } - - let dag_fetcher = Arc::new(DagFetcher::new( - self.epoch_state.clone(), - self.network.clone(), - self.time_service.clone(), - )); +pub(super) struct DagStateSynchronizer { + epoch_state: Arc, + notifier: Arc, + time_service: TimeService, + state_computer: Arc, + storage: Arc, +} - self.sync_to_highest_ordered_anchor(node, dag_fetcher).await +impl DagStateSynchronizer { + pub fn new( + epoch_state: Arc, + notifier: Arc, + time_service: TimeService, + state_computer: Arc, + storage: Arc, + ) -> Self { + Self { + epoch_state, + notifier, + time_service, + state_computer, + storage, + } } /// Note: Assumes that the sync checks have been done - pub async fn sync_to_highest_ordered_anchor( + pub async fn sync_dag_to( &self, node: &CertifiedNodeMessage, - dag_fetcher: Arc, - ) -> anyhow::Result>>> { + dag_fetcher: impl TDagFetcher, + current_dag_store: Arc>, + ) -> anyhow::Result> { let commit_li = node.ledger_info(); + { + let dag_reader = current_dag_store.read(); + assert!( + dag_reader + .highest_ordered_anchor_round() + .unwrap_or_default() + < commit_li.commit_info().round() + || dag_reader.highest_committed_anchor_round() + + ((STATE_SYNC_WINDOW_MULTIPLIER * DAG_WINDOW) as Round) + < commit_li.commit_info().round() + ); + } + if commit_li.ledger_info().ends_epoch() { self.notifier .send_epoch_change(EpochChangeProof::new( @@ -179,6 +196,6 @@ impl StateSyncManager { // TODO: the caller should rebootstrap the order rule - Ok(Some(sync_dag_store)) + Ok(Arc::into_inner(sync_dag_store).map(|r| r.into_inner())) } } diff --git a/consensus/src/dag/order_rule.rs b/consensus/src/dag/order_rule.rs index 76721b3ef9e29..15478c9830da3 100644 --- a/consensus/src/dag/order_rule.rs +++ b/consensus/src/dag/order_rule.rs @@ -21,7 +21,7 @@ pub struct OrderRule { lowest_unordered_anchor_round: Round, dag: Arc>, anchor_election: Box, - notifier: Box, + notifier: Arc, storage: Arc, } @@ -31,7 +31,7 @@ impl OrderRule { latest_ledger_info: LedgerInfo, dag: Arc>, mut anchor_election: Box, - notifier: Box, + notifier: Arc, storage: Arc, ) -> Self { let committed_round = if latest_ledger_info.ends_epoch() { diff --git a/consensus/src/dag/rb_handler.rs b/consensus/src/dag/rb_handler.rs index cf740f9db56be..19e839f2a9741 100644 --- a/consensus/src/dag/rb_handler.rs +++ b/consensus/src/dag/rb_handler.rs @@ -28,7 +28,7 @@ pub enum NodeBroadcastHandleError { pub(crate) struct NodeBroadcastHandler { dag: Arc>, votes_by_round_peer: BTreeMap>, - signer: ValidatorSigner, + signer: Arc, epoch_state: Arc, storage: Arc, fetch_requester: Arc, @@ -37,7 +37,7 @@ pub(crate) struct NodeBroadcastHandler { impl NodeBroadcastHandler { pub fn new( dag: Arc>, - signer: ValidatorSigner, + signer: Arc, epoch_state: Arc, storage: Arc, fetch_requester: Arc, diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index 4aa4fd28f5e96..e2c5868275db2 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -104,7 +104,7 @@ async fn test_certified_node_handler() { LedgerInfo::mock_genesis(None), dag.clone(), Box::new(RoundRobinAnchorElection::new(validators)), - Box::new(TestNotifier { tx }), + Arc::new(TestNotifier { tx }), storage.clone(), ); diff --git a/consensus/src/dag/tests/dag_state_sync_tests.rs b/consensus/src/dag/tests/dag_state_sync_tests.rs index 4644590ef78ce..e47d654ec1b3a 100644 --- a/consensus/src/dag/tests/dag_state_sync_tests.rs +++ b/consensus/src/dag/tests/dag_state_sync_tests.rs @@ -4,7 +4,7 @@ use crate::{ dag::{ adapter::Notifier, dag_fetcher::{FetchRequestHandler, TDagFetcher}, - dag_state_sync::{StateSyncManager, DAG_WINDOW}, + dag_state_sync::{DagStateSynchronizer, DAG_WINDOW}, dag_store::Dag, storage::DAGStorage, tests::{dag_test::MockStorage, helpers::generate_dag_nodes}, @@ -100,7 +100,7 @@ struct MockNotifier {} #[async_trait] impl Notifier for MockNotifier { fn send_ordered_nodes( - &mut self, + &self, _ordered_nodes: Vec>, _failed_author: Vec<(Round, Author)>, ) -> anyhow::Result<()> { @@ -112,24 +112,17 @@ impl Notifier for MockNotifier { async fn send_commit_proof(&self, _ledger_info: LedgerInfoWithSignatures) {} } -fn setup( - epoch_state: Arc, - dag_store: Arc>, - storage: Arc, -) -> StateSyncManager { - let network = Arc::new(MockDAGNetworkSender {}); +fn setup(epoch_state: Arc, storage: Arc) -> DagStateSynchronizer { let time_service = TimeService::mock(); let state_computer = Arc::new(EmptyStateComputer {}); - let upstream_notifier = Arc::new(MockNotifier {}); + let downstream_notifier = Arc::new(MockNotifier {}); - StateSyncManager::new( + DagStateSynchronizer::new( epoch_state, - network, - upstream_notifier, + downstream_notifier, time_service, state_computer, storage, - dag_store, ) } @@ -193,24 +186,19 @@ async fn test_dag_state_sync() { let sync_node_li = CertifiedNodeMessage::new(sync_to_node, sync_to_li); - let state_sync = setup(epoch_state.clone(), slow_dag.clone(), storage.clone()); - let dag_fetcher = Arc::new(MockDagFetcher { + let state_sync = setup(epoch_state.clone(), storage.clone()); + let dag_fetcher = MockDagFetcher { target_dag: fast_dag.clone(), epoch_state: epoch_state.clone(), - }); + }; let sync_result = state_sync - .sync_to_highest_ordered_anchor(&sync_node_li, dag_fetcher) + .sync_dag_to(&sync_node_li, dag_fetcher, slow_dag.clone()) .await; let new_dag = sync_result.unwrap().unwrap(); - let dag_reader = new_dag.read(); - - assert_eq!(dag_reader.lowest_round(), (LI_ROUNDS - DAG_WINDOW) as Round); - assert_eq!(dag_reader.highest_round(), (NUM_ROUNDS - 1) as Round); - assert_none!(dag_reader.highest_ordered_anchor_round(),); - assert_eq!( - dag_reader.highest_committed_anchor_round(), - LI_ROUNDS as Round - ); + assert_eq!(new_dag.lowest_round(), (LI_ROUNDS - DAG_WINDOW) as Round); + assert_eq!(new_dag.highest_round(), (NUM_ROUNDS - 1) as Round); + assert_none!(new_dag.highest_ordered_anchor_round(),); + assert_eq!(new_dag.highest_committed_anchor_round(), LI_ROUNDS as Round); } diff --git a/consensus/src/dag/tests/integration_tests.rs b/consensus/src/dag/tests/integration_tests.rs index 6425edc02e9b2..ace3e4d0941b2 100644 --- a/consensus/src/dag/tests/integration_tests.rs +++ b/consensus/src/dag/tests/integration_tests.rs @@ -2,12 +2,12 @@ use super::dag_test; use crate::{ - dag::bootstrap::bootstrap_dag, + dag::{bootstrap::bootstrap_dag_for_test, types::CertifiedNodeMessage}, experimental::buffer_manager::OrderedBlocks, network::{DAGNetworkSenderImpl, IncomingDAGRequest, NetworkSender}, network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC}, network_tests::{NetworkPlayground, TwinId}, - test_utils::{consensus_runtime, MockPayloadManager, MockStorage}, + test_utils::{consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStorage}, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::network_id::{NetworkId, PeerNetworkId}; @@ -32,16 +32,17 @@ use aptos_types::{ }; use claims::assert_gt; use futures::{ - stream::{select, AbortHandle, Select}, + stream::{select, Select}, StreamExt, }; use futures_channel::mpsc::UnboundedReceiver; use maplit::hashmap; use std::sync::Arc; +use tokio::task::JoinHandle; struct DagBootstrapUnit { - nh_abort_handle: AbortHandle, - df_abort_handle: AbortHandle, + nh_task_handle: JoinHandle, + df_task_handle: JoinHandle<()>, dag_rpc_tx: aptos_channel::Sender, network_events: Box, aptos_channels::Receiver>>>, @@ -71,22 +72,26 @@ impl DagBootstrapUnit { let payload_client = Arc::new(MockPayloadManager::new(None)); - let (nh_abort_handle, df_abort_handle, dag_rpc_tx, ordered_nodes_rx) = bootstrap_dag( - self_peer, - signer, - Arc::new(epoch_state), - storage.get_ledger_info(), - Arc::new(dag_storage), - network.clone(), - network.clone(), - time_service, - payload_client, - ); + let state_computer = Arc::new(EmptyStateComputer {}); + + let (nh_abort_handle, df_abort_handle, dag_rpc_tx, ordered_nodes_rx) = + bootstrap_dag_for_test( + self_peer, + signer, + Arc::new(epoch_state), + storage.get_ledger_info(), + Arc::new(dag_storage), + network.clone(), + network.clone(), + time_service, + payload_client, + state_computer, + ); ( Self { - nh_abort_handle, - df_abort_handle, + nh_task_handle: nh_abort_handle, + df_task_handle: df_abort_handle, dag_rpc_tx, network_events, }, diff --git a/consensus/src/dag/tests/order_rule_tests.rs b/consensus/src/dag/tests/order_rule_tests.rs index c403820cd89ca..f597a8dc428ce 100644 --- a/consensus/src/dag/tests/order_rule_tests.rs +++ b/consensus/src/dag/tests/order_rule_tests.rs @@ -87,7 +87,7 @@ pub struct TestNotifier { #[async_trait] impl Notifier for TestNotifier { fn send_ordered_nodes( - &mut self, + &self, ordered_nodes: Vec>, _failed_authors: Vec<(Round, Author)>, ) -> anyhow::Result<()> { @@ -118,7 +118,7 @@ fn create_order_rule( ledger_info, dag, anchor_election, - Box::new(TestNotifier { tx }), + Arc::new(TestNotifier { tx }), Arc::new(MockStorage::new()), ), rx, diff --git a/consensus/src/dag/tests/rb_handler_tests.rs b/consensus/src/dag/tests/rb_handler_tests.rs index d49503c5d1f4f..f79b1cd2f471a 100644 --- a/consensus/src/dag/tests/rb_handler_tests.rs +++ b/consensus/src/dag/tests/rb_handler_tests.rs @@ -38,6 +38,7 @@ async fn test_node_broadcast_receiver_succeed() { epoch: 1, verifier: validator_verifier.clone(), }); + let signers: Vec<_> = signers.into_iter().map(Arc::new).collect(); // Scenario: Start DAG from beginning let storage = Arc::new(MockStorage::new()); @@ -80,6 +81,7 @@ async fn test_node_broadcast_receiver_failure() { epoch: 1, verifier: validator_verifier.clone(), }); + let signers: Vec<_> = signers.into_iter().map(Arc::new).collect(); let mut rb_receivers: Vec<_> = signers .iter() @@ -156,10 +158,12 @@ async fn test_node_broadcast_receiver_failure() { #[test] fn test_node_broadcast_receiver_storage() { let (signers, validator_verifier) = random_validator_verifier(4, None, false); + let signers: Vec<_> = signers.into_iter().map(Arc::new).collect(); let epoch_state = Arc::new(EpochState { epoch: 1, verifier: validator_verifier, }); + let storage = Arc::new(MockStorage::new()); let dag = Arc::new(RwLock::new(Dag::new( epoch_state.clone(), diff --git a/consensus/src/dag/types.rs b/consensus/src/dag/types.rs index bdbbea9ace3b6..927908f83564d 100644 --- a/consensus/src/dag/types.rs +++ b/consensus/src/dag/types.rs @@ -431,13 +431,13 @@ impl CertifiedNodeMessage { } } - pub fn ledger_info(&self) -> &LedgerInfoWithSignatures { - &self.ledger_info - } - pub fn certified_node(self) -> CertifiedNode { self.inner } + + pub fn ledger_info(&self) -> &LedgerInfoWithSignatures { + &self.ledger_info + } } impl Deref for CertifiedNodeMessage {