From b00b8638bc703a47e080a57564461b5de413d8eb Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Thu, 7 Sep 2023 13:36:49 -0700 Subject: [PATCH] [dag] dag rebootstrap --- consensus/src/dag/adapter.rs | 6 +- consensus/src/dag/bootstrap.rs | 340 ++++++++++++++----- consensus/src/dag/dag_driver.rs | 17 +- consensus/src/dag/dag_handler.rs | 30 +- consensus/src/dag/dag_state_sync.rs | 136 ++++---- consensus/src/dag/order_rule.rs | 4 +- consensus/src/dag/tests/dag_driver_tests.rs | 2 +- consensus/src/dag/tests/integration_tests.rs | 45 ++- consensus/src/dag/tests/order_rule_tests.rs | 12 +- consensus/src/dag/types.rs | 12 +- 10 files changed, 416 insertions(+), 188 deletions(-) diff --git a/consensus/src/dag/adapter.rs b/consensus/src/dag/adapter.rs index d96eb6d293e483..b2f8a30493dd66 100644 --- a/consensus/src/dag/adapter.rs +++ b/consensus/src/dag/adapter.rs @@ -22,9 +22,9 @@ use futures_channel::mpsc::UnboundedSender; use std::sync::Arc; #[async_trait] -pub trait Notifier: Send { +pub trait Notifier: Send + Sync { fn send_ordered_nodes( - &mut self, + &self, ordered_nodes: Vec>, failed_author: Vec<(Round, Author)>, ) -> anyhow::Result<()>; @@ -54,7 +54,7 @@ impl NotificationAdapter { #[async_trait] impl Notifier for NotificationAdapter { fn send_ordered_nodes( - &mut self, + &self, ordered_nodes: Vec>, failed_author: Vec<(Round, Author)>, ) -> anyhow::Result<()> { diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index e8a8674abb9435..a8f7544e31a6da 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -1,11 +1,13 @@ // Copyright © Aptos Foundation use super::{ + adapter::Notifier, anchor_election::RoundRobinAnchorElection, dag_driver::DagDriver, dag_fetcher::{DagFetcherService, FetchRequestHandler}, dag_handler::NetworkHandler, dag_network::TDAGNetworkSender, + dag_state_sync::{DagStateSynchronizer, StateSyncTrigger}, dag_store::Dag, order_rule::OrderRule, rb_handler::NodeBroadcastHandler, @@ -13,120 +15,288 @@ use super::{ types::DAGMessage, }; use crate::{ - dag::adapter::NotificationAdapter, experimental::buffer_manager::OrderedBlocks, - network::IncomingDAGRequest, state_replication::PayloadClient, + dag::{adapter::NotificationAdapter, dag_fetcher::DagFetcher}, + experimental::buffer_manager::OrderedBlocks, + network::IncomingDAGRequest, + state_replication::{PayloadClient, StateComputer}, }; -use aptos_channels::{aptos_channel, message_queues::QueueStyle}; -use aptos_consensus_types::common::Author; +use aptos_channels::{ + aptos_channel::{self, Receiver}, + message_queues::QueueStyle, +}; +use aptos_consensus_types::common::{Author, Round}; +use aptos_crypto::HashValue; use aptos_infallible::RwLock; +use aptos_logger::error; use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; +use aptos_storage_interface::DbReader; use aptos_types::{ - epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner, + aggregate_signature::AggregateSignature, + block_info::BlockInfo, + epoch_state::EpochState, + ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, + validator_signer::ValidatorSigner, +}; +use futures::{FutureExt, StreamExt}; +use futures_channel::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, }; -use futures::stream::{AbortHandle, Abortable}; use std::{sync::Arc, time::Duration}; +use tokio::{select, task::JoinHandle}; use tokio_retry::strategy::ExponentialBackoff; -pub fn bootstrap_dag( +struct DagBootstrapper { self_peer: Author, signer: ValidatorSigner, epoch_state: Arc, - latest_ledger_info: LedgerInfo, + db: Arc, storage: Arc, rb_network_sender: Arc>, dag_network_sender: Arc, time_service: aptos_time_service::TimeService, payload_client: Arc, -) -> ( - AbortHandle, - AbortHandle, - aptos_channel::Sender, - futures_channel::mpsc::UnboundedReceiver, -) { - let validators = epoch_state.verifier.get_ordered_account_addresses(); - let current_round = latest_ledger_info.round(); + state_computer: Arc, +} - let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); - let adapter = Box::new(NotificationAdapter::new(ordered_nodes_tx, storage.clone())); - let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); +impl DagBootstrapper { + fn new( + self_peer: Author, + signer: ValidatorSigner, + epoch_state: Arc, + db: Arc, + storage: Arc, + rb_network_sender: Arc>, + dag_network_sender: Arc, + time_service: aptos_time_service::TimeService, + payload_client: Arc, + state_computer: Arc, + ) -> Self { + Self { + self_peer, + signer, + epoch_state, + db, + storage, + rb_network_sender, + dag_network_sender, + time_service, + payload_client, + state_computer, + } + } - // A backoff policy that starts at 100ms and doubles each iteration. - let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50); - let rb = Arc::new(ReliableBroadcast::new( - validators.clone(), - rb_network_sender, - rb_backoff_policy, - time_service.clone(), - // TODO: add to config - Duration::from_millis(500), - )); - - let dag = Arc::new(RwLock::new(Dag::new( - epoch_state.clone(), - storage.clone(), - current_round, - 0, - ))); - - let anchor_election = Box::new(RoundRobinAnchorElection::new(validators)); - let order_rule = OrderRule::new( - epoch_state.clone(), - latest_ledger_info, - dag.clone(), - anchor_election, - adapter, - storage.clone(), - ); + fn bootstrap_dag_store( + &self, + initial_round: Round, + latest_ledger_info: LedgerInfo, + notifier: Arc, + ) -> (Arc>, OrderRule) { + let dag = Arc::new(RwLock::new(Dag::new( + self.epoch_state.clone(), + self.storage.clone(), + initial_round, + latest_ledger_info.round(), + ))); - let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = - DagFetcherService::new( - epoch_state.clone(), - dag_network_sender, + let validators = self.epoch_state.verifier.get_ordered_account_addresses(); + let anchor_election = Box::new(RoundRobinAnchorElection::new(validators)); + + let order_rule = OrderRule::new( + self.epoch_state.clone(), + latest_ledger_info, dag.clone(), - time_service.clone(), + anchor_election, + notifier, + self.storage.clone(), ); - let fetch_requester = Arc::new(fetch_requester); - let dag_driver = DagDriver::new( + (dag, order_rule) + } + + fn bootstrap_components( + &self, + dag: Arc>, + order_rule: OrderRule, + state_sync_trigger: StateSyncTrigger, + ) -> (NetworkHandler, DagFetcherService) { + let validators = self.epoch_state.verifier.get_ordered_account_addresses(); + + // A backoff policy that starts at 100ms and doubles each iteration. + let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50); + let rb = Arc::new(ReliableBroadcast::new( + validators.clone(), + self.rb_network_sender.clone(), + rb_backoff_policy, + self.time_service.clone(), + // TODO: add to config + Duration::from_millis(500), + )); + + let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = + DagFetcherService::new( + self.epoch_state.clone(), + self.dag_network_sender.clone(), + dag.clone(), + self.time_service.clone(), + ); + let fetch_requester = Arc::new(fetch_requester); + + let dag_driver = DagDriver::new( + self.self_peer, + self.epoch_state.clone(), + dag.clone(), + self.payload_client.clone(), + rb, + self.time_service.clone(), + self.storage.clone(), + order_rule, + fetch_requester.clone(), + ); + let rb_handler = NodeBroadcastHandler::new( + dag.clone(), + self.signer.clone(), + self.epoch_state.clone(), + self.storage.clone(), + fetch_requester, + ); + let fetch_handler = FetchRequestHandler::new(dag, self.epoch_state.clone()); + + let dag_handler = NetworkHandler::new( + self.epoch_state.clone(), + rb_handler, + dag_driver, + fetch_handler, + node_fetch_waiter, + certified_node_fetch_waiter, + state_sync_trigger, + ); + + (dag_handler, dag_fetcher) + } + + async fn bootstrapper( + self, + mut dag_rpc_rx: Receiver, + ordered_nodes_tx: UnboundedSender, + shutdown_rx: oneshot::Receiver<()>, + ) -> anyhow::Result<()> { + let adapter = Arc::new(NotificationAdapter::new( + ordered_nodes_tx, + self.storage.clone(), + )); + + let sync_manager = DagStateSynchronizer::new( + self.epoch_state.clone(), + adapter.clone(), + self.time_service.clone(), + self.state_computer.clone(), + self.storage.clone(), + ); + + // TODO: fetch the correct block info + let ledger_info = LedgerInfoWithSignatures::new( + LedgerInfo::new(BlockInfo::empty(), HashValue::zero()), + AggregateSignature::empty(), + ); + + let mut shutdown_rx = shutdown_rx.into_stream(); + + loop { + let (rebootstrap_notification_tx, mut rebootstrap_notification_rx) = + tokio::sync::mpsc::channel(1); + + // TODO: fix + let (dag_store, order_rule) = + self.bootstrap_dag_store(0, ledger_info.ledger_info().clone(), adapter.clone()); + + let state_sync_trigger = StateSyncTrigger::new( + dag_store.clone(), + adapter.clone(), + rebootstrap_notification_tx, + ); + + let (handler, fetch_service) = + self.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); + + let df_handle = tokio::spawn(fetch_service.start()); + + // poll the network handler while waiting for rebootstrap notification or shutdown notification + select! { + biased; + _ = shutdown_rx.select_next_some() => { + df_handle.abort(); + let _ = df_handle.await; + return Ok(()); + }, + Some(node) = rebootstrap_notification_rx.recv() => { + df_handle.abort(); + let _ = df_handle.await; + + let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone()); + + if let Err(e) = sync_manager.sync_dag_to(&node, dag_fetcher, dag_store.clone()).await { + error!(error = ?e, "unable to sync"); + } + }, + _ = handler.start(&mut dag_rpc_rx) => {} + } + } + } +} + +pub(super) fn bootstrap_dag_for_test( + self_peer: Author, + signer: ValidatorSigner, + epoch_state: Arc, + latest_ledger_info: LedgerInfo, + storage: Arc, + rb_network_sender: Arc>, + dag_network_sender: Arc, + time_service: aptos_time_service::TimeService, + payload_client: Arc, + db: Arc, + state_computer: Arc, +) -> ( + JoinHandle<()>, + JoinHandle<()>, + aptos_channel::Sender, + UnboundedReceiver, +) { + let bootstraper = DagBootstrapper::new( self_peer, - epoch_state.clone(), - dag.clone(), - payload_client, - rb, - time_service, - storage.clone(), - order_rule, - fetch_requester.clone(), - ); - let rb_handler = NodeBroadcastHandler::new( - dag.clone(), signer, - epoch_state.clone(), + epoch_state, + db, storage.clone(), - fetch_requester, + rb_network_sender, + dag_network_sender, + time_service, + payload_client, + state_computer, ); - let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone()); - let dag_handler = NetworkHandler::new( - epoch_state, - dag_rpc_rx, - rb_handler, - dag_driver, - fetch_handler, - node_fetch_waiter, - certified_node_fetch_waiter, + let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); + let adapter = Arc::new(NotificationAdapter::new(ordered_nodes_tx, storage.clone())); + let (dag_rpc_tx, mut dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); + let (rebootstrap_notification_tx, _rebootstrap_notification_rx) = tokio::sync::mpsc::channel(1); + + let (dag_store, order_rule) = + bootstraper.bootstrap_dag_store(0, latest_ledger_info, adapter.clone()); + + let state_sync_trigger = StateSyncTrigger::new( + dag_store.clone(), + adapter.clone(), + rebootstrap_notification_tx, ); - let (nh_abort_handle, nh_abort_registration) = AbortHandle::new_pair(); - let (df_abort_handle, df_abort_registration) = AbortHandle::new_pair(); + let (handler, fetch_service) = + bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); - tokio::spawn(Abortable::new(dag_handler.start(), nh_abort_registration)); - tokio::spawn(Abortable::new(dag_fetcher.start(), df_abort_registration)); + let dh_handle = tokio::spawn(async move { + handler.start(&mut dag_rpc_rx).await + }); + let df_handle = tokio::spawn(fetch_service.start()); - ( - nh_abort_handle, - df_abort_handle, - dag_rpc_tx, - ordered_nodes_rx, - ) + (dh_handle, df_handle, dag_rpc_tx, ordered_nodes_rx) } diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 3e79d444b6183a..ddd977e3a9a030 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -5,7 +5,7 @@ use super::{ dag_fetcher::FetchRequester, order_rule::OrderRule, storage::DAGStorage, - types::{CertifiedAck, DAGMessage, Extensions}, + types::{CertifiedAck, CertifiedNodeMessage, DAGMessage, Extensions}, RpcHandler, }; use crate::{ @@ -22,7 +22,12 @@ use aptos_infallible::RwLock; use aptos_logger::error; use aptos_reliable_broadcast::ReliableBroadcast; use aptos_time_service::{TimeService, TimeServiceTrait}; -use aptos_types::{block_info::Round, epoch_state::EpochState}; +use aptos_types::{ + aggregate_signature::AggregateSignature, + block_info::{BlockInfo, Round}, + epoch_state::EpochState, + ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, +}; use futures::{ future::{AbortHandle, Abortable}, FutureExt, @@ -156,7 +161,13 @@ impl DagDriver { .broadcast(node.clone(), signature_builder) .then(move |certificate| { let certified_node = CertifiedNode::new(node, certificate.signatures().to_owned()); - rb.broadcast(certified_node, cert_ack_set) + // TODO: fetch the correct block info + let ledger_info = LedgerInfoWithSignatures::new( + LedgerInfo::new(BlockInfo::empty(), certified_node.digest()), + AggregateSignature::empty(), + ); + let certified_node_msg = CertifiedNodeMessage::new(certified_node, ledger_info); + rb.broadcast(certified_node_msg, cert_ack_set) }); tokio::spawn(Abortable::new(task, abort_registration)); if let Some(prev_handle) = self.rb_abort_handle.replace(abort_handle) { diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index 62e83ea056e993..3a9c35cbff8997 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -3,6 +3,7 @@ use super::{ dag_driver::DagDriver, dag_fetcher::{FetchRequestHandler, FetchWaiter}, + dag_state_sync::StateSyncTrigger, types::TDAGMessage, CertifiedNode, Node, }; @@ -23,40 +24,43 @@ use tokio::select; pub(crate) struct NetworkHandler { epoch_state: Arc, - dag_rpc_rx: aptos_channel::Receiver, node_receiver: NodeBroadcastHandler, dag_driver: DagDriver, fetch_receiver: FetchRequestHandler, node_fetch_waiter: FetchWaiter, certified_node_fetch_waiter: FetchWaiter, + state_sync_trigger: StateSyncTrigger, } impl NetworkHandler { - pub fn new( + pub(super) fn new( epoch_state: Arc, - dag_rpc_rx: aptos_channel::Receiver, node_receiver: NodeBroadcastHandler, dag_driver: DagDriver, fetch_receiver: FetchRequestHandler, node_fetch_waiter: FetchWaiter, certified_node_fetch_waiter: FetchWaiter, + state_sync_trigger: StateSyncTrigger, ) -> Self { Self { epoch_state, - dag_rpc_rx, node_receiver, dag_driver, fetch_receiver, node_fetch_waiter, certified_node_fetch_waiter, + state_sync_trigger, } } - pub async fn start(mut self) { + pub async fn start( + mut self, + dag_rpc_rx: &mut aptos_channel::Receiver, + ) { // TODO(ibalajiarun): clean up Reliable Broadcast storage periodically. loop { select! { - Some(msg) = self.dag_rpc_rx.next() => { + Some(msg) = dag_rpc_rx.next() => { if let Err(e) = self.process_rpc(msg).await { warn!(error = ?e, "error processing rpc"); } @@ -90,10 +94,15 @@ impl NetworkHandler { .verify(&self.epoch_state.verifier) .and_then(|_| self.node_receiver.process(node)) .map(|r| r.into()), - DAGMessage::CertifiedNodeMsg(node) => node - .verify(&self.epoch_state.verifier) - .and_then(|_| self.dag_driver.process(node)) - .map(|r| r.into()), + DAGMessage::CertifiedNodeMsg(node) => match node.verify(&self.epoch_state.verifier) { + Ok(_) => { + let node = self.state_sync_trigger.check(node).await; + self.dag_driver + .process(node.certified_node()) + .map(|r| r.into()) + }, + Err(e) => Err(e), + }, DAGMessage::FetchRequest(request) => request .verify(&self.epoch_state.verifier) .and_then(|_| self.fetch_receiver.process(request)) @@ -117,5 +126,6 @@ impl NetworkHandler { .response_sender .send(response) .map_err(|_| anyhow::anyhow!("unable to respond to rpc")) + .into() } } diff --git a/consensus/src/dag/dag_state_sync.rs b/consensus/src/dag/dag_state_sync.rs index c6c9b8bfea078d..d5eb0a05a6a899 100644 --- a/consensus/src/dag/dag_state_sync.rs +++ b/consensus/src/dag/dag_state_sync.rs @@ -2,7 +2,7 @@ use super::{ adapter::Notifier, - dag_fetcher::{DagFetcher, TDagFetcher}, + dag_fetcher::TDagFetcher, dag_store::Dag, storage::DAGStorage, types::{CertifiedNodeMessage, RemoteFetchRequest}, @@ -27,66 +27,64 @@ pub const STATE_SYNC_WINDOW_MULTIPLIER: u64 = 30; #[async_trait] pub trait TUpstreamNotifier: Send {} -pub(super) struct StateSyncManager { - epoch_state: Arc, - network: Arc, - notifier: Arc, - time_service: TimeService, - state_computer: Arc, - storage: Arc, +pub(super) struct StateSyncTrigger { dag_store: Arc>, + downstream_notifier: Arc, + bootstrap_notifier: tokio::sync::mpsc::Sender, } -impl StateSyncManager { - pub fn new( - epoch_state: Arc, - network: Arc, - notifier: Arc, - time_service: TimeService, - state_computer: Arc, - storage: Arc, +impl StateSyncTrigger { + pub(super) fn new( dag_store: Arc>, + downstream_notifier: Arc, + bootstrap_notifier: tokio::sync::mpsc::Sender, ) -> Self { Self { - epoch_state, - network, - notifier, - time_service, - state_computer, - storage, dag_store, + downstream_notifier, + bootstrap_notifier, } } - pub async fn sync_to( - &self, - node: &CertifiedNodeMessage, - ) -> anyhow::Result>>> { - self.sync_to_highest_commit_cert(node.ledger_info()).await; - self.try_sync_to_highest_ordered_anchor(node).await + /// This method checks if a state sync is required, and if so, + /// notifies the bootstraper and yields the current task infinitely, + /// to let the bootstraper can abort this task. + pub(super) async fn check(&self, node: CertifiedNodeMessage) -> CertifiedNodeMessage { + let ledger_info = node.ledger_info(); + + self.notify_commit_proof(ledger_info).await; + + if self.need_sync_for_ledger_info(ledger_info) { + self.bootstrap_notifier.send(node).await; + loop { + tokio::task::yield_now().await; + } + } + node } /// Fast forward in the decoupled-execution pipeline if the block exists there - pub async fn sync_to_highest_commit_cert(&self, ledger_info: &LedgerInfoWithSignatures) { - let dag_reader = self.dag_store.read(); - + async fn notify_commit_proof(&self, ledger_info: &LedgerInfoWithSignatures) { // if the anchor exists between ledger info round and highest ordered round // Note: ledger info round <= highest ordered round - if dag_reader - .highest_committed_anchor_round() + if self.dag_store.read().highest_committed_anchor_round() < ledger_info.commit_info().round() - && dag_reader + && self + .dag_store + .read() .highest_ordered_anchor_round() .unwrap_or_default() >= ledger_info.commit_info().round() { - self.notifier.send_commit_proof(ledger_info.clone()).await + self.downstream_notifier + .send_commit_proof(ledger_info.clone()) + .await } } /// Check if we're far away from this ledger info and need to sync. /// This ensures that the block referred by the ledger info is not in buffer manager. - pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { + fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { let dag_reader = self.dag_store.read(); // check whether if DAG order round is behind the given ledger info round // (meaning consensus is behind) or @@ -96,39 +94,59 @@ impl StateSyncManager { .highest_ordered_anchor_round() .unwrap_or_default() < li.commit_info().round()) - || dag_reader - .highest_committed_anchor_round() + || dag_reader.highest_committed_anchor_round() + STATE_SYNC_WINDOW_MULTIPLIER * DAG_WINDOW < li.commit_info().round() } +} - pub async fn try_sync_to_highest_ordered_anchor( - &self, - node: &CertifiedNodeMessage, - ) -> anyhow::Result>>> { - // Check whether to actually sync - let commit_li = node.ledger_info(); - if !self.need_sync_for_ledger_info(commit_li) { - return Ok(None); - } - - let dag_fetcher = Arc::new(DagFetcher::new( - self.epoch_state.clone(), - self.network.clone(), - self.time_service.clone(), - )); +pub(super) struct DagStateSynchronizer { + epoch_state: Arc, + notifier: Arc, + time_service: TimeService, + state_computer: Arc, + storage: Arc, +} - self.sync_to_highest_ordered_anchor(node, dag_fetcher).await +impl DagStateSynchronizer { + pub fn new( + epoch_state: Arc, + notifier: Arc, + time_service: TimeService, + state_computer: Arc, + storage: Arc, + ) -> Self { + Self { + epoch_state, + notifier, + time_service, + state_computer, + storage, + } } /// Note: Assumes that the sync checks have been done - pub async fn sync_to_highest_ordered_anchor( + pub async fn sync_dag_to( &self, node: &CertifiedNodeMessage, - dag_fetcher: Arc, - ) -> anyhow::Result>>> { + dag_fetcher: impl TDagFetcher, + current_dag_store: Arc>, + ) -> anyhow::Result<()> { let commit_li = node.ledger_info(); + { + let dag_reader = current_dag_store.read(); + assert!( + dag_reader + .highest_ordered_anchor_round() + .unwrap_or_default() + < commit_li.commit_info().round() + || dag_reader.highest_committed_anchor_round() + + STATE_SYNC_WINDOW_MULTIPLIER * DAG_WINDOW + < commit_li.commit_info().round() + ); + } + if commit_li.ledger_info().ends_epoch() { self.notifier .send_epoch_change(EpochChangeProof::new( @@ -137,7 +155,7 @@ impl StateSyncManager { )) .await; // TODO: make sure to terminate DAG and yield to epoch manager - return Ok(None); + return Ok(()); } // TODO: there is a case where DAG fetches missing nodes in window and a crash happens and when we restart, @@ -180,6 +198,6 @@ impl StateSyncManager { // TODO: the caller should rebootstrap the order rule - Ok(Some(sync_dag_store)) + Ok(()) } } diff --git a/consensus/src/dag/order_rule.rs b/consensus/src/dag/order_rule.rs index 52687efd80cdef..b86ebaede12b33 100644 --- a/consensus/src/dag/order_rule.rs +++ b/consensus/src/dag/order_rule.rs @@ -20,7 +20,7 @@ pub struct OrderRule { lowest_unordered_anchor_round: Round, dag: Arc>, anchor_election: Box, - notifier: Box, + notifier: Arc, storage: Arc, } @@ -30,7 +30,7 @@ impl OrderRule { latest_ledger_info: LedgerInfo, dag: Arc>, anchor_election: Box, - notifier: Box, + notifier: Arc, storage: Arc, ) -> Self { // TODO: we need to initialize the anchor election based on the dag diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index 94e1f9036cd1c5..1202d52d7af202 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -98,7 +98,7 @@ async fn test_certified_node_handler() { LedgerInfo::mock_genesis(None), dag.clone(), Box::new(RoundRobinAnchorElection::new(validators)), - Box::new(TestNotifier { tx }), + Arc::new(TestNotifier { tx }), storage.clone(), ); diff --git a/consensus/src/dag/tests/integration_tests.rs b/consensus/src/dag/tests/integration_tests.rs index 1288716fa8df7b..a7167de6861922 100644 --- a/consensus/src/dag/tests/integration_tests.rs +++ b/consensus/src/dag/tests/integration_tests.rs @@ -2,12 +2,14 @@ use super::dag_test; use crate::{ - dag::bootstrap::bootstrap_dag, + dag::bootstrap::bootstrap_dag_for_test, experimental::buffer_manager::OrderedBlocks, network::{DAGNetworkSenderImpl, IncomingDAGRequest, NetworkSender}, network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC}, network_tests::{NetworkPlayground, TwinId}, - test_utils::{consensus_runtime, MockPayloadManager, MockStorage}, + test_utils::{ + consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStateComputer, MockStorage, + }, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::network_id::{NetworkId, PeerNetworkId}; @@ -23,6 +25,7 @@ use aptos_network::{ transport::ConnectionMetadata, ProtocolId, }; +use aptos_storage_interface::mock::MockDbReaderWriter; use aptos_time_service::TimeService; use aptos_types::{ epoch_state::EpochState, @@ -37,10 +40,11 @@ use futures::{ use futures_channel::mpsc::UnboundedReceiver; use maplit::hashmap; use std::sync::Arc; +use tokio::task::JoinHandle; struct DagBootstrapUnit { - nh_abort_handle: AbortHandle, - df_abort_handle: AbortHandle, + nh_task_handle: JoinHandle<()>, + df_task_handle: JoinHandle<()>, dag_rpc_tx: aptos_channel::Sender, network_events: Box, aptos_channels::Receiver>>>, @@ -68,22 +72,29 @@ impl DagBootstrapUnit { let payload_client = Arc::new(MockPayloadManager::new(None)); - let (nh_abort_handle, df_abort_handle, dag_rpc_tx, ordered_nodes_rx) = bootstrap_dag( - self_peer, - signer, - Arc::new(epoch_state), - storage.get_ledger_info(), - Arc::new(dag_storage), - network.clone(), - network.clone(), - time_service, - payload_client, - ); + let db = Arc::new(MockDbReaderWriter {}); + + let state_computer = Arc::new(EmptyStateComputer {}); + + let (nh_abort_handle, df_abort_handle, dag_rpc_tx, ordered_nodes_rx) = + bootstrap_dag_for_test( + self_peer, + signer, + Arc::new(epoch_state), + storage.get_ledger_info(), + Arc::new(dag_storage), + network.clone(), + network.clone(), + time_service, + payload_client, + db, + state_computer, + ); ( Self { - nh_abort_handle, - df_abort_handle, + nh_task_handle: nh_abort_handle, + df_task_handle: df_abort_handle, dag_rpc_tx, network_events, }, diff --git a/consensus/src/dag/tests/order_rule_tests.rs b/consensus/src/dag/tests/order_rule_tests.rs index 4772366f345acf..5e411fe9a49108 100644 --- a/consensus/src/dag/tests/order_rule_tests.rs +++ b/consensus/src/dag/tests/order_rule_tests.rs @@ -3,20 +3,22 @@ use crate::{ dag::{ + adapter::Notifier, anchor_election::RoundRobinAnchorElection, dag_store::Dag, order_rule::OrderRule, tests::{dag_test::MockStorage, helpers::new_certified_node}, types::{NodeCertificate, NodeMetadata}, - CertifiedNode, adapter::Notifier, + CertifiedNode, }, test_utils::placeholder_ledger_info, }; use aptos_consensus_types::common::{Author, Round}; use aptos_infallible::{Mutex, RwLock}; use aptos_types::{ - aggregate_signature::AggregateSignature, epoch_state::EpochState, - validator_verifier::random_validator_verifier, epoch_change::EpochChangeProof, ledger_info::LedgerInfoWithSignatures, + aggregate_signature::AggregateSignature, epoch_change::EpochChangeProof, + epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, + validator_verifier::random_validator_verifier, }; use async_trait::async_trait; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; @@ -125,7 +127,7 @@ pub struct TestNotifier { #[async_trait] impl Notifier for TestNotifier { fn send_ordered_nodes( - &mut self, + &self, ordered_nodes: Vec>, _failed_authors: Vec<(Round, Author)>, ) -> anyhow::Result<()> { @@ -156,7 +158,7 @@ fn create_order_rule( ledger_info, dag, anchor_election, - Box::new(TestNotifier { tx }), + Arc::new(TestNotifier { tx }), Arc::new(MockStorage::new()), ), rx, diff --git a/consensus/src/dag/types.rs b/consensus/src/dag/types.rs index 56c8d190bf32bf..3f53aa34527780 100644 --- a/consensus/src/dag/types.rs +++ b/consensus/src/dag/types.rs @@ -409,8 +409,10 @@ impl Deref for CertifiedNode { } } -impl TDAGMessage for CertifiedNode { +impl TDAGMessage for CertifiedNodeMessage { fn verify(&self, verifier: &ValidatorVerifier) -> anyhow::Result<()> { + self.ledger_info().verify_signatures(verifier); + ensure!(self.digest() == self.calculate_digest(), "invalid digest"); verifier @@ -433,6 +435,10 @@ impl CertifiedNodeMessage { } } + pub fn certified_node(self) -> CertifiedNode { + self.inner + } + pub fn ledger_info(&self) -> &LedgerInfoWithSignatures { &self.ledger_info } @@ -533,7 +539,7 @@ impl CertifiedAck { impl BroadcastStatus for CertificateAckState { type Ack = CertifiedAck; type Aggregated = (); - type Message = CertifiedNode; + type Message = CertifiedNodeMessage; fn add(&mut self, peer: Author, _ack: Self::Ack) -> anyhow::Result> { self.received.insert(peer); @@ -647,7 +653,7 @@ impl core::fmt::Debug for DAGNetworkMessage { pub enum DAGMessage { NodeMsg(Node), VoteMsg(Vote), - CertifiedNodeMsg(CertifiedNode), + CertifiedNodeMsg(CertifiedNodeMessage), CertifiedAckMsg(CertifiedAck), FetchRequest(RemoteFetchRequest), FetchResponse(FetchResponse),