From 27c302113dc87273f295ece315d2028dacae729b Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Thu, 7 Sep 2023 13:36:49 -0700 Subject: [PATCH 1/6] [dag] dag rebootstrap --- consensus/src/dag/adapter.rs | 6 +- consensus/src/dag/bootstrap.rs | 335 +++++++++++++----- consensus/src/dag/dag_driver.rs | 8 +- consensus/src/dag/dag_handler.rs | 30 +- consensus/src/dag/dag_state_sync.rs | 144 ++++---- consensus/src/dag/order_rule.rs | 2 +- consensus/src/dag/tests/dag_driver_tests.rs | 2 +- .../src/dag/tests/dag_state_sync_tests.rs | 4 +- consensus/src/dag/tests/integration_tests.rs | 42 ++- consensus/src/dag/tests/order_rule_tests.rs | 4 +- consensus/src/dag/types.rs | 8 +- 11 files changed, 394 insertions(+), 191 deletions(-) diff --git a/consensus/src/dag/adapter.rs b/consensus/src/dag/adapter.rs index 6e3091fd3421d..6c7846124bf91 100644 --- a/consensus/src/dag/adapter.rs +++ b/consensus/src/dag/adapter.rs @@ -31,9 +31,9 @@ use futures_channel::mpsc::UnboundedSender; use std::{collections::HashMap, sync::Arc}; #[async_trait] -pub trait Notifier: Send { +pub trait Notifier: Send + Sync { fn send_ordered_nodes( - &mut self, + &self, ordered_nodes: Vec>, failed_author: Vec<(Round, Author)>, ) -> anyhow::Result<()>; @@ -62,7 +62,7 @@ impl NotifierAdapter { #[async_trait] impl Notifier for NotifierAdapter { fn send_ordered_nodes( - &mut self, + &self, ordered_nodes: Vec>, failed_author: Vec<(Round, Author)>, ) -> anyhow::Result<()> { diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index a28c613e24c0c..45fd5b6dd9d7e 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -1,12 +1,13 @@ // Copyright © Aptos Foundation use super::{ + adapter::Notifier, anchor_election::RoundRobinAnchorElection, dag_driver::DagDriver, dag_fetcher::{DagFetcherService, FetchRequestHandler}, dag_handler::NetworkHandler, dag_network::TDAGNetworkSender, - dag_state_sync::DAG_WINDOW, + dag_state_sync::{DagStateSynchronizer, StateSyncTrigger, DAG_WINDOW}, dag_store::Dag, order_rule::OrderRule, rb_handler::NodeBroadcastHandler, @@ -14,120 +15,282 @@ use super::{ types::DAGMessage, }; use crate::{ - dag::adapter::NotifierAdapter, experimental::buffer_manager::OrderedBlocks, - network::IncomingDAGRequest, state_replication::PayloadClient, + dag::{adapter::NotifierAdapter, dag_fetcher::DagFetcher}, + experimental::buffer_manager::OrderedBlocks, + network::IncomingDAGRequest, + state_replication::{PayloadClient, StateComputer}, }; -use aptos_channels::{aptos_channel, message_queues::QueueStyle}; -use aptos_consensus_types::common::Author; +use aptos_channels::{ + aptos_channel::{self, Receiver}, + message_queues::QueueStyle, +}; +use aptos_consensus_types::common::{Author, Round}; +use aptos_crypto::HashValue; use aptos_infallible::RwLock; +use aptos_logger::error; use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; use aptos_types::{ - epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner, + aggregate_signature::AggregateSignature, + block_info::BlockInfo, + epoch_state::EpochState, + ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, + validator_signer::ValidatorSigner, +}; +use futures::{FutureExt, StreamExt}; +use futures_channel::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, }; -use futures::stream::{AbortHandle, Abortable}; use std::{sync::Arc, time::Duration}; +use tokio::{select, task::JoinHandle}; use tokio_retry::strategy::ExponentialBackoff; -pub fn bootstrap_dag( +struct DagBootstrapper { self_peer: Author, signer: ValidatorSigner, epoch_state: Arc, - latest_ledger_info: LedgerInfo, storage: Arc, rb_network_sender: Arc>, dag_network_sender: Arc, time_service: aptos_time_service::TimeService, payload_client: Arc, -) -> ( - AbortHandle, - AbortHandle, - aptos_channel::Sender, - futures_channel::mpsc::UnboundedReceiver, -) { - let validators = epoch_state.verifier.get_ordered_account_addresses(); - let current_round = latest_ledger_info.round(); + state_computer: Arc, +} - let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); - let adapter = Box::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone())); - let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); +impl DagBootstrapper { + fn new( + self_peer: Author, + signer: ValidatorSigner, + epoch_state: Arc, + storage: Arc, + rb_network_sender: Arc>, + dag_network_sender: Arc, + time_service: aptos_time_service::TimeService, + payload_client: Arc, + state_computer: Arc, + ) -> Self { + Self { + self_peer, + signer, + epoch_state, + storage, + rb_network_sender, + dag_network_sender, + time_service, + payload_client, + state_computer, + } + } - // A backoff policy that starts at 100ms and doubles each iteration. - let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50); - let rb = Arc::new(ReliableBroadcast::new( - validators.clone(), - rb_network_sender, - rb_backoff_policy, - time_service.clone(), - // TODO: add to config - Duration::from_millis(500), - )); - - let dag = Arc::new(RwLock::new(Dag::new( - epoch_state.clone(), - storage.clone(), - current_round, - DAG_WINDOW, - ))); - - let anchor_election = Box::new(RoundRobinAnchorElection::new(validators)); - let order_rule = OrderRule::new( - epoch_state.clone(), - latest_ledger_info, - dag.clone(), - anchor_election, - adapter, - storage.clone(), - ); + fn bootstrap_dag_store( + &self, + initial_round: Round, + latest_ledger_info: LedgerInfo, + notifier: Arc, + ) -> (Arc>, OrderRule) { + let dag = Arc::new(RwLock::new(Dag::new( + self.epoch_state.clone(), + self.storage.clone(), + initial_round, + DAG_WINDOW, + ))); - let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = - DagFetcherService::new( - epoch_state.clone(), - dag_network_sender, + let validators = self.epoch_state.verifier.get_ordered_account_addresses(); + let anchor_election = Box::new(RoundRobinAnchorElection::new(validators)); + + let order_rule = OrderRule::new( + self.epoch_state.clone(), + latest_ledger_info, dag.clone(), - time_service.clone(), + anchor_election, + notifier, + self.storage.clone(), ); - let fetch_requester = Arc::new(fetch_requester); - let dag_driver = DagDriver::new( + (dag, order_rule) + } + + fn bootstrap_components( + &self, + dag: Arc>, + order_rule: OrderRule, + state_sync_trigger: StateSyncTrigger, + ) -> (NetworkHandler, DagFetcherService) { + let validators = self.epoch_state.verifier.get_ordered_account_addresses(); + + // A backoff policy that starts at 100ms and doubles each iteration. + let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50); + let rb = Arc::new(ReliableBroadcast::new( + validators.clone(), + self.rb_network_sender.clone(), + rb_backoff_policy, + self.time_service.clone(), + // TODO: add to config + Duration::from_millis(500), + )); + + let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = + DagFetcherService::new( + self.epoch_state.clone(), + self.dag_network_sender.clone(), + dag.clone(), + self.time_service.clone(), + ); + let fetch_requester = Arc::new(fetch_requester); + + let dag_driver = DagDriver::new( + self.self_peer, + self.epoch_state.clone(), + dag.clone(), + self.payload_client.clone(), + rb, + self.time_service.clone(), + self.storage.clone(), + order_rule, + fetch_requester.clone(), + ); + let rb_handler = NodeBroadcastHandler::new( + dag.clone(), + self.signer.clone(), + self.epoch_state.clone(), + self.storage.clone(), + fetch_requester, + ); + let fetch_handler = FetchRequestHandler::new(dag, self.epoch_state.clone()); + + let dag_handler = NetworkHandler::new( + self.epoch_state.clone(), + rb_handler, + dag_driver, + fetch_handler, + node_fetch_waiter, + certified_node_fetch_waiter, + state_sync_trigger, + ); + + (dag_handler, dag_fetcher) + } + + async fn bootstrapper( + self, + mut dag_rpc_rx: Receiver, + ordered_nodes_tx: UnboundedSender, + shutdown_rx: oneshot::Receiver<()>, + ) -> anyhow::Result<()> { + let adapter = Arc::new(NotifierAdapter::new( + ordered_nodes_tx, + self.storage.clone(), + )); + + let sync_manager = DagStateSynchronizer::new( + self.epoch_state.clone(), + adapter.clone(), + self.time_service.clone(), + self.state_computer.clone(), + self.storage.clone(), + ); + + // TODO: fetch the correct block info + let ledger_info = LedgerInfoWithSignatures::new( + LedgerInfo::new(BlockInfo::empty(), HashValue::zero()), + AggregateSignature::empty(), + ); + + let mut shutdown_rx = shutdown_rx.into_stream(); + + loop { + let (rebootstrap_notification_tx, mut rebootstrap_notification_rx) = + tokio::sync::mpsc::channel(1); + + // TODO: fix + let (dag_store, order_rule) = + self.bootstrap_dag_store(0, ledger_info.ledger_info().clone(), adapter.clone()); + + let state_sync_trigger = StateSyncTrigger::new( + dag_store.clone(), + adapter.clone(), + rebootstrap_notification_tx, + ); + + let (handler, fetch_service) = + self.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); + + let df_handle = tokio::spawn(fetch_service.start()); + + // poll the network handler while waiting for rebootstrap notification or shutdown notification + select! { + biased; + _ = shutdown_rx.select_next_some() => { + df_handle.abort(); + let _ = df_handle.await; + return Ok(()); + }, + Some(node) = rebootstrap_notification_rx.recv() => { + df_handle.abort(); + let _ = df_handle.await; + + let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone()); + + if let Err(e) = sync_manager.sync_dag_to(&node, dag_fetcher, dag_store.clone()).await { + error!(error = ?e, "unable to sync"); + } + }, + _ = handler.start(&mut dag_rpc_rx) => {} + } + } + } +} + +pub(super) fn bootstrap_dag_for_test( + self_peer: Author, + signer: ValidatorSigner, + epoch_state: Arc, + latest_ledger_info: LedgerInfo, + storage: Arc, + rb_network_sender: Arc>, + dag_network_sender: Arc, + time_service: aptos_time_service::TimeService, + payload_client: Arc, + state_computer: Arc, +) -> ( + JoinHandle<()>, + JoinHandle<()>, + aptos_channel::Sender, + UnboundedReceiver, +) { + let bootstraper = DagBootstrapper::new( self_peer, - epoch_state.clone(), - dag.clone(), - payload_client, - rb, - time_service, - storage.clone(), - order_rule, - fetch_requester.clone(), - ); - let rb_handler = NodeBroadcastHandler::new( - dag.clone(), signer, - epoch_state.clone(), + epoch_state, storage.clone(), - fetch_requester, + rb_network_sender, + dag_network_sender, + time_service, + payload_client, + state_computer, ); - let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone()); - let dag_handler = NetworkHandler::new( - epoch_state, - dag_rpc_rx, - rb_handler, - dag_driver, - fetch_handler, - node_fetch_waiter, - certified_node_fetch_waiter, + let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); + let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone())); + let (dag_rpc_tx, mut dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); + let (rebootstrap_notification_tx, _rebootstrap_notification_rx) = tokio::sync::mpsc::channel(1); + + let (dag_store, order_rule) = + bootstraper.bootstrap_dag_store(0, latest_ledger_info, adapter.clone()); + + let state_sync_trigger = StateSyncTrigger::new( + dag_store.clone(), + adapter.clone(), + rebootstrap_notification_tx, ); - let (nh_abort_handle, nh_abort_registration) = AbortHandle::new_pair(); - let (df_abort_handle, df_abort_registration) = AbortHandle::new_pair(); + let (handler, fetch_service) = + bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); - tokio::spawn(Abortable::new(dag_handler.start(), nh_abort_registration)); - tokio::spawn(Abortable::new(dag_fetcher.start(), df_abort_registration)); + let dh_handle = tokio::spawn(async move { + handler.start(&mut dag_rpc_rx).await + }); + let df_handle = tokio::spawn(fetch_service.start()); - ( - nh_abort_handle, - df_abort_handle, - dag_rpc_tx, - ordered_nodes_rx, - ) + (dh_handle, df_handle, dag_rpc_tx, ordered_nodes_rx) } diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 9d9406725eea0..84e32feb0d579 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -22,7 +22,12 @@ use aptos_infallible::RwLock; use aptos_logger::error; use aptos_reliable_broadcast::ReliableBroadcast; use aptos_time_service::{TimeService, TimeServiceTrait}; -use aptos_types::{block_info::Round, epoch_state::EpochState}; +use aptos_types::{ + aggregate_signature::AggregateSignature, + block_info::{BlockInfo, Round}, + epoch_state::EpochState, + ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, +}; use futures::{ future::{AbortHandle, Abortable}, FutureExt, @@ -160,7 +165,6 @@ impl DagDriver { .broadcast(node.clone(), signature_builder) .then(move |certificate| { let certified_node = CertifiedNode::new(node, certificate.signatures().to_owned()); - let certified_node_msg = CertifiedNodeMessage::new(certified_node, latest_ledger_info); rb.broadcast(certified_node_msg, cert_ack_set) diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index 0b23e45a796ba..3a9c35cbff899 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -3,6 +3,7 @@ use super::{ dag_driver::DagDriver, dag_fetcher::{FetchRequestHandler, FetchWaiter}, + dag_state_sync::StateSyncTrigger, types::TDAGMessage, CertifiedNode, Node, }; @@ -23,40 +24,43 @@ use tokio::select; pub(crate) struct NetworkHandler { epoch_state: Arc, - dag_rpc_rx: aptos_channel::Receiver, node_receiver: NodeBroadcastHandler, dag_driver: DagDriver, fetch_receiver: FetchRequestHandler, node_fetch_waiter: FetchWaiter, certified_node_fetch_waiter: FetchWaiter, + state_sync_trigger: StateSyncTrigger, } impl NetworkHandler { - pub fn new( + pub(super) fn new( epoch_state: Arc, - dag_rpc_rx: aptos_channel::Receiver, node_receiver: NodeBroadcastHandler, dag_driver: DagDriver, fetch_receiver: FetchRequestHandler, node_fetch_waiter: FetchWaiter, certified_node_fetch_waiter: FetchWaiter, + state_sync_trigger: StateSyncTrigger, ) -> Self { Self { epoch_state, - dag_rpc_rx, node_receiver, dag_driver, fetch_receiver, node_fetch_waiter, certified_node_fetch_waiter, + state_sync_trigger, } } - pub async fn start(mut self) { + pub async fn start( + mut self, + dag_rpc_rx: &mut aptos_channel::Receiver, + ) { // TODO(ibalajiarun): clean up Reliable Broadcast storage periodically. loop { select! { - Some(msg) = self.dag_rpc_rx.next() => { + Some(msg) = dag_rpc_rx.next() => { if let Err(e) = self.process_rpc(msg).await { warn!(error = ?e, "error processing rpc"); } @@ -90,10 +94,15 @@ impl NetworkHandler { .verify(&self.epoch_state.verifier) .and_then(|_| self.node_receiver.process(node)) .map(|r| r.into()), - DAGMessage::CertifiedNodeMsg(certified_node_msg) => certified_node_msg - .verify(&self.epoch_state.verifier) - .and_then(|_| self.dag_driver.process(certified_node_msg.certified_node())) - .map(|r| r.into()), + DAGMessage::CertifiedNodeMsg(node) => match node.verify(&self.epoch_state.verifier) { + Ok(_) => { + let node = self.state_sync_trigger.check(node).await; + self.dag_driver + .process(node.certified_node()) + .map(|r| r.into()) + }, + Err(e) => Err(e), + }, DAGMessage::FetchRequest(request) => request .verify(&self.epoch_state.verifier) .and_then(|_| self.fetch_receiver.process(request)) @@ -117,5 +126,6 @@ impl NetworkHandler { .response_sender .send(response) .map_err(|_| anyhow::anyhow!("unable to respond to rpc")) + .into() } } diff --git a/consensus/src/dag/dag_state_sync.rs b/consensus/src/dag/dag_state_sync.rs index 82b270291ca60..a98e11136674d 100644 --- a/consensus/src/dag/dag_state_sync.rs +++ b/consensus/src/dag/dag_state_sync.rs @@ -2,11 +2,10 @@ use super::{ adapter::Notifier, - dag_fetcher::{DagFetcher, TDagFetcher}, + dag_fetcher::TDagFetcher, dag_store::Dag, storage::DAGStorage, types::{CertifiedNodeMessage, RemoteFetchRequest}, - TDAGNetworkSender, }; use crate::state_replication::StateComputer; use aptos_consensus_types::common::Round; @@ -24,66 +23,64 @@ use std::sync::Arc; pub const DAG_WINDOW: usize = 1; pub const STATE_SYNC_WINDOW_MULTIPLIER: usize = 30; -pub(super) struct StateSyncManager { - epoch_state: Arc, - network: Arc, - notifier: Arc, - time_service: TimeService, - state_computer: Arc, - storage: Arc, +pub(super) struct StateSyncTrigger { dag_store: Arc>, + downstream_notifier: Arc, + bootstrap_notifier: tokio::sync::mpsc::Sender, } -impl StateSyncManager { - pub fn new( - epoch_state: Arc, - network: Arc, - notifier: Arc, - time_service: TimeService, - state_computer: Arc, - storage: Arc, +impl StateSyncTrigger { + pub(super) fn new( dag_store: Arc>, + downstream_notifier: Arc, + bootstrap_notifier: tokio::sync::mpsc::Sender, ) -> Self { Self { - epoch_state, - network, - notifier, - time_service, - state_computer, - storage, dag_store, + downstream_notifier, + bootstrap_notifier, } } - pub async fn sync_to( - &self, - node: &CertifiedNodeMessage, - ) -> anyhow::Result>>> { - self.sync_to_highest_commit_cert(node.ledger_info()).await; - self.try_sync_to_highest_ordered_anchor(node).await + /// This method checks if a state sync is required, and if so, + /// notifies the bootstraper and yields the current task infinitely, + /// to let the bootstraper can abort this task. + pub(super) async fn check(&self, node: CertifiedNodeMessage) -> CertifiedNodeMessage { + let ledger_info = node.ledger_info(); + + self.notify_commit_proof(ledger_info).await; + + if self.need_sync_for_ledger_info(ledger_info) { + self.bootstrap_notifier.send(node).await; + loop { + tokio::task::yield_now().await; + } + } + node } /// Fast forward in the decoupled-execution pipeline if the block exists there - pub async fn sync_to_highest_commit_cert(&self, ledger_info: &LedgerInfoWithSignatures) { - let send_commit_proof = { - let dag_reader = self.dag_store.read(); - dag_reader.highest_committed_anchor_round() < ledger_info.commit_info().round() - && dag_reader - .highest_ordered_anchor_round() - .unwrap_or_default() - >= ledger_info.commit_info().round() - }; - + async fn notify_commit_proof(&self, ledger_info: &LedgerInfoWithSignatures) { // if the anchor exists between ledger info round and highest ordered round // Note: ledger info round <= highest ordered round - if send_commit_proof { - self.notifier.send_commit_proof(ledger_info.clone()).await + if self.dag_store.read().highest_committed_anchor_round() + < ledger_info.commit_info().round() + && self + .dag_store + .read() + .highest_ordered_anchor_round() + .unwrap_or_default() + >= ledger_info.commit_info().round() + { + self.downstream_notifier + .send_commit_proof(ledger_info.clone()) + .await } } /// Check if we're far away from this ledger info and need to sync. /// This ensures that the block referred by the ledger info is not in buffer manager. - pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { + fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { let dag_reader = self.dag_store.read(); // check whether if DAG order round is behind the given ledger info round // (meaning consensus is behind) or @@ -97,34 +94,55 @@ impl StateSyncManager { + ((STATE_SYNC_WINDOW_MULTIPLIER * DAG_WINDOW) as Round) < li.commit_info().round() } +} - pub async fn try_sync_to_highest_ordered_anchor( - &self, - node: &CertifiedNodeMessage, - ) -> anyhow::Result>>> { - // Check whether to actually sync - let commit_li = node.ledger_info(); - if !self.need_sync_for_ledger_info(commit_li) { - return Ok(None); - } - - let dag_fetcher = Arc::new(DagFetcher::new( - self.epoch_state.clone(), - self.network.clone(), - self.time_service.clone(), - )); +pub(super) struct DagStateSynchronizer { + epoch_state: Arc, + notifier: Arc, + time_service: TimeService, + state_computer: Arc, + storage: Arc, +} - self.sync_to_highest_ordered_anchor(node, dag_fetcher).await +impl DagStateSynchronizer { + pub fn new( + epoch_state: Arc, + notifier: Arc, + time_service: TimeService, + state_computer: Arc, + storage: Arc, + ) -> Self { + Self { + epoch_state, + notifier, + time_service, + state_computer, + storage, + } } /// Note: Assumes that the sync checks have been done - pub async fn sync_to_highest_ordered_anchor( + pub async fn sync_dag_to( &self, node: &CertifiedNodeMessage, - dag_fetcher: Arc, - ) -> anyhow::Result>>> { + dag_fetcher: impl TDagFetcher, + current_dag_store: Arc>, + ) -> anyhow::Result<()> { let commit_li = node.ledger_info(); + { + let dag_reader = current_dag_store.read(); + assert!( + dag_reader + .highest_ordered_anchor_round() + .unwrap_or_default() + < commit_li.commit_info().round() + || dag_reader.highest_committed_anchor_round() + + ((STATE_SYNC_WINDOW_MULTIPLIER * DAG_WINDOW) as Round) + < commit_li.commit_info().round() + ); + } + if commit_li.ledger_info().ends_epoch() { self.notifier .send_epoch_change(EpochChangeProof::new( @@ -133,7 +151,7 @@ impl StateSyncManager { )) .await; // TODO: make sure to terminate DAG and yield to epoch manager - return Ok(None); + return Ok(()); } // TODO: there is a case where DAG fetches missing nodes in window and a crash happens and when we restart, @@ -179,6 +197,6 @@ impl StateSyncManager { // TODO: the caller should rebootstrap the order rule - Ok(Some(sync_dag_store)) + Ok(()) } } diff --git a/consensus/src/dag/order_rule.rs b/consensus/src/dag/order_rule.rs index 76721b3ef9e29..479be085979ce 100644 --- a/consensus/src/dag/order_rule.rs +++ b/consensus/src/dag/order_rule.rs @@ -21,7 +21,7 @@ pub struct OrderRule { lowest_unordered_anchor_round: Round, dag: Arc>, anchor_election: Box, - notifier: Box, + notifier: Arc, storage: Arc, } diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index 4aa4fd28f5e96..e2c5868275db2 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -104,7 +104,7 @@ async fn test_certified_node_handler() { LedgerInfo::mock_genesis(None), dag.clone(), Box::new(RoundRobinAnchorElection::new(validators)), - Box::new(TestNotifier { tx }), + Arc::new(TestNotifier { tx }), storage.clone(), ); diff --git a/consensus/src/dag/tests/dag_state_sync_tests.rs b/consensus/src/dag/tests/dag_state_sync_tests.rs index 4644590ef78ce..ec9b16db3d84d 100644 --- a/consensus/src/dag/tests/dag_state_sync_tests.rs +++ b/consensus/src/dag/tests/dag_state_sync_tests.rs @@ -4,7 +4,7 @@ use crate::{ dag::{ adapter::Notifier, dag_fetcher::{FetchRequestHandler, TDagFetcher}, - dag_state_sync::{StateSyncManager, DAG_WINDOW}, + dag_state_sync::DAG_WINDOW, dag_store::Dag, storage::DAGStorage, tests::{dag_test::MockStorage, helpers::generate_dag_nodes}, @@ -100,7 +100,7 @@ struct MockNotifier {} #[async_trait] impl Notifier for MockNotifier { fn send_ordered_nodes( - &mut self, + &self, _ordered_nodes: Vec>, _failed_author: Vec<(Round, Author)>, ) -> anyhow::Result<()> { diff --git a/consensus/src/dag/tests/integration_tests.rs b/consensus/src/dag/tests/integration_tests.rs index 6425edc02e9b2..a812800e213a2 100644 --- a/consensus/src/dag/tests/integration_tests.rs +++ b/consensus/src/dag/tests/integration_tests.rs @@ -2,12 +2,14 @@ use super::dag_test; use crate::{ - dag::bootstrap::bootstrap_dag, + dag::bootstrap::bootstrap_dag_for_test, experimental::buffer_manager::OrderedBlocks, network::{DAGNetworkSenderImpl, IncomingDAGRequest, NetworkSender}, network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC}, network_tests::{NetworkPlayground, TwinId}, - test_utils::{consensus_runtime, MockPayloadManager, MockStorage}, + test_utils::{ + consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStateComputer, MockStorage, + }, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::network_id::{NetworkId, PeerNetworkId}; @@ -23,6 +25,7 @@ use aptos_network::{ transport::ConnectionMetadata, ProtocolId, }; +use aptos_storage_interface::mock::MockDbReaderWriter; use aptos_time_service::TimeService; use aptos_types::{ epoch_state::EpochState, @@ -38,10 +41,11 @@ use futures::{ use futures_channel::mpsc::UnboundedReceiver; use maplit::hashmap; use std::sync::Arc; +use tokio::task::JoinHandle; struct DagBootstrapUnit { - nh_abort_handle: AbortHandle, - df_abort_handle: AbortHandle, + nh_task_handle: JoinHandle<()>, + df_task_handle: JoinHandle<()>, dag_rpc_tx: aptos_channel::Sender, network_events: Box, aptos_channels::Receiver>>>, @@ -71,22 +75,26 @@ impl DagBootstrapUnit { let payload_client = Arc::new(MockPayloadManager::new(None)); - let (nh_abort_handle, df_abort_handle, dag_rpc_tx, ordered_nodes_rx) = bootstrap_dag( - self_peer, - signer, - Arc::new(epoch_state), - storage.get_ledger_info(), - Arc::new(dag_storage), - network.clone(), - network.clone(), - time_service, - payload_client, - ); + let state_computer = Arc::new(EmptyStateComputer {}); + + let (nh_abort_handle, df_abort_handle, dag_rpc_tx, ordered_nodes_rx) = + bootstrap_dag_for_test( + self_peer, + signer, + Arc::new(epoch_state), + storage.get_ledger_info(), + Arc::new(dag_storage), + network.clone(), + network.clone(), + time_service, + payload_client, + state_computer, + ); ( Self { - nh_abort_handle, - df_abort_handle, + nh_task_handle: nh_abort_handle, + df_task_handle: df_abort_handle, dag_rpc_tx, network_events, }, diff --git a/consensus/src/dag/tests/order_rule_tests.rs b/consensus/src/dag/tests/order_rule_tests.rs index c403820cd89ca..f597a8dc428ce 100644 --- a/consensus/src/dag/tests/order_rule_tests.rs +++ b/consensus/src/dag/tests/order_rule_tests.rs @@ -87,7 +87,7 @@ pub struct TestNotifier { #[async_trait] impl Notifier for TestNotifier { fn send_ordered_nodes( - &mut self, + &self, ordered_nodes: Vec>, _failed_authors: Vec<(Round, Author)>, ) -> anyhow::Result<()> { @@ -118,7 +118,7 @@ fn create_order_rule( ledger_info, dag, anchor_election, - Box::new(TestNotifier { tx }), + Arc::new(TestNotifier { tx }), Arc::new(MockStorage::new()), ), rx, diff --git a/consensus/src/dag/types.rs b/consensus/src/dag/types.rs index bdbbea9ace3b6..927908f83564d 100644 --- a/consensus/src/dag/types.rs +++ b/consensus/src/dag/types.rs @@ -431,13 +431,13 @@ impl CertifiedNodeMessage { } } - pub fn ledger_info(&self) -> &LedgerInfoWithSignatures { - &self.ledger_info - } - pub fn certified_node(self) -> CertifiedNode { self.inner } + + pub fn ledger_info(&self) -> &LedgerInfoWithSignatures { + &self.ledger_info + } } impl Deref for CertifiedNodeMessage { From e642b505125dc1bcf15c9e5826457562c87b4d30 Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Mon, 11 Sep 2023 12:34:50 -0700 Subject: [PATCH 2/6] fix tests --- consensus/src/dag/bootstrap.rs | 18 ++++------ consensus/src/dag/dag_driver.rs | 4 +-- consensus/src/dag/dag_handler.rs | 2 +- consensus/src/dag/dag_state_sync.rs | 12 +++---- .../src/dag/tests/dag_state_sync_tests.rs | 35 +++++++------------ consensus/src/dag/tests/integration_tests.rs | 5 ++- 6 files changed, 29 insertions(+), 47 deletions(-) diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 45fd5b6dd9d7e..48abb90e69d97 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -24,7 +24,7 @@ use aptos_channels::{ aptos_channel::{self, Receiver}, message_queues::QueueStyle, }; -use aptos_consensus_types::common::{Author, Round}; +use aptos_consensus_types::common::Author; use aptos_crypto::HashValue; use aptos_infallible::RwLock; use aptos_logger::error; @@ -84,14 +84,13 @@ impl DagBootstrapper { fn bootstrap_dag_store( &self, - initial_round: Round, latest_ledger_info: LedgerInfo, notifier: Arc, ) -> (Arc>, OrderRule) { let dag = Arc::new(RwLock::new(Dag::new( self.epoch_state.clone(), self.storage.clone(), - initial_round, + latest_ledger_info.round(), DAG_WINDOW, ))); @@ -177,10 +176,7 @@ impl DagBootstrapper { ordered_nodes_tx: UnboundedSender, shutdown_rx: oneshot::Receiver<()>, ) -> anyhow::Result<()> { - let adapter = Arc::new(NotifierAdapter::new( - ordered_nodes_tx, - self.storage.clone(), - )); + let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, self.storage.clone())); let sync_manager = DagStateSynchronizer::new( self.epoch_state.clone(), @@ -204,7 +200,7 @@ impl DagBootstrapper { // TODO: fix let (dag_store, order_rule) = - self.bootstrap_dag_store(0, ledger_info.ledger_info().clone(), adapter.clone()); + self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone()); let state_sync_trigger = StateSyncTrigger::new( dag_store.clone(), @@ -276,7 +272,7 @@ pub(super) fn bootstrap_dag_for_test( let (rebootstrap_notification_tx, _rebootstrap_notification_rx) = tokio::sync::mpsc::channel(1); let (dag_store, order_rule) = - bootstraper.bootstrap_dag_store(0, latest_ledger_info, adapter.clone()); + bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone()); let state_sync_trigger = StateSyncTrigger::new( dag_store.clone(), @@ -287,9 +283,7 @@ pub(super) fn bootstrap_dag_for_test( let (handler, fetch_service) = bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); - let dh_handle = tokio::spawn(async move { - handler.start(&mut dag_rpc_rx).await - }); + let dh_handle = tokio::spawn(async move { handler.start(&mut dag_rpc_rx).await }); let df_handle = tokio::spawn(fetch_service.start()); (dh_handle, df_handle, dag_rpc_tx, ordered_nodes_rx) diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 84e32feb0d579..8aa498f283ef8 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -23,10 +23,8 @@ use aptos_logger::error; use aptos_reliable_broadcast::ReliableBroadcast; use aptos_time_service::{TimeService, TimeServiceTrait}; use aptos_types::{ - aggregate_signature::AggregateSignature, - block_info::{BlockInfo, Round}, + block_info::{Round}, epoch_state::EpochState, - ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, }; use futures::{ future::{AbortHandle, Abortable}, diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index 3a9c35cbff899..ddb4620554188 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -96,7 +96,7 @@ impl NetworkHandler { .map(|r| r.into()), DAGMessage::CertifiedNodeMsg(node) => match node.verify(&self.epoch_state.verifier) { Ok(_) => { - let node = self.state_sync_trigger.check(node).await; + let node = self.state_sync_trigger.check(node).await self.dag_driver .process(node.certified_node()) .map(|r| r.into()) diff --git a/consensus/src/dag/dag_state_sync.rs b/consensus/src/dag/dag_state_sync.rs index a98e11136674d..554c9ef5e117b 100644 --- a/consensus/src/dag/dag_state_sync.rs +++ b/consensus/src/dag/dag_state_sync.rs @@ -45,18 +45,18 @@ impl StateSyncTrigger { /// This method checks if a state sync is required, and if so, /// notifies the bootstraper and yields the current task infinitely, /// to let the bootstraper can abort this task. - pub(super) async fn check(&self, node: CertifiedNodeMessage) -> CertifiedNodeMessage { + pub(super) async fn check(&self, node: CertifiedNodeMessage) -> anyhow::Result { let ledger_info = node.ledger_info(); self.notify_commit_proof(ledger_info).await; if self.need_sync_for_ledger_info(ledger_info) { - self.bootstrap_notifier.send(node).await; + self.bootstrap_notifier.send(node).await?; loop { tokio::task::yield_now().await; } } - node + Ok(node) } /// Fast forward in the decoupled-execution pipeline if the block exists there @@ -127,7 +127,7 @@ impl DagStateSynchronizer { node: &CertifiedNodeMessage, dag_fetcher: impl TDagFetcher, current_dag_store: Arc>, - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { let commit_li = node.ledger_info(); { @@ -151,7 +151,7 @@ impl DagStateSynchronizer { )) .await; // TODO: make sure to terminate DAG and yield to epoch manager - return Ok(()); + return Ok(None); } // TODO: there is a case where DAG fetches missing nodes in window and a crash happens and when we restart, @@ -197,6 +197,6 @@ impl DagStateSynchronizer { // TODO: the caller should rebootstrap the order rule - Ok(()) + Ok(Arc::into_inner(sync_dag_store).map(|r| r.into_inner())) } } diff --git a/consensus/src/dag/tests/dag_state_sync_tests.rs b/consensus/src/dag/tests/dag_state_sync_tests.rs index ec9b16db3d84d..24317bc5ce59a 100644 --- a/consensus/src/dag/tests/dag_state_sync_tests.rs +++ b/consensus/src/dag/tests/dag_state_sync_tests.rs @@ -4,7 +4,7 @@ use crate::{ dag::{ adapter::Notifier, dag_fetcher::{FetchRequestHandler, TDagFetcher}, - dag_state_sync::DAG_WINDOW, + dag_state_sync::{DagStateSynchronizer, DAG_WINDOW}, dag_store::Dag, storage::DAGStorage, tests::{dag_test::MockStorage, helpers::generate_dag_nodes}, @@ -112,24 +112,17 @@ impl Notifier for MockNotifier { async fn send_commit_proof(&self, _ledger_info: LedgerInfoWithSignatures) {} } -fn setup( - epoch_state: Arc, - dag_store: Arc>, - storage: Arc, -) -> StateSyncManager { - let network = Arc::new(MockDAGNetworkSender {}); +fn setup(epoch_state: Arc, storage: Arc) -> DagStateSynchronizer { let time_service = TimeService::mock(); let state_computer = Arc::new(EmptyStateComputer {}); - let upstream_notifier = Arc::new(MockNotifier {}); + let downstream_notifier = Arc::new(MockNotifier {}); - StateSyncManager::new( + DagStateSynchronizer::new( epoch_state, - network, - upstream_notifier, + downstream_notifier, time_service, state_computer, storage, - dag_store, ) } @@ -193,24 +186,22 @@ async fn test_dag_state_sync() { let sync_node_li = CertifiedNodeMessage::new(sync_to_node, sync_to_li); - let state_sync = setup(epoch_state.clone(), slow_dag.clone(), storage.clone()); - let dag_fetcher = Arc::new(MockDagFetcher { + let state_sync = setup(epoch_state.clone(), storage.clone()); + let dag_fetcher = MockDagFetcher { target_dag: fast_dag.clone(), epoch_state: epoch_state.clone(), - }); + }; let sync_result = state_sync - .sync_to_highest_ordered_anchor(&sync_node_li, dag_fetcher) + .sync_dag_to(&sync_node_li, dag_fetcher, slow_dag.clone()) .await; let new_dag = sync_result.unwrap().unwrap(); - let dag_reader = new_dag.read(); - - assert_eq!(dag_reader.lowest_round(), (LI_ROUNDS - DAG_WINDOW) as Round); - assert_eq!(dag_reader.highest_round(), (NUM_ROUNDS - 1) as Round); - assert_none!(dag_reader.highest_ordered_anchor_round(),); + assert_eq!(new_dag.lowest_round(), (LI_ROUNDS - DAG_WINDOW) as Round); + assert_eq!(new_dag.highest_round(), (NUM_ROUNDS - 1) as Round); + assert_none!(new_dag.highest_ordered_anchor_round(),); assert_eq!( - dag_reader.highest_committed_anchor_round(), + new_dag.highest_committed_anchor_round(), LI_ROUNDS as Round ); } diff --git a/consensus/src/dag/tests/integration_tests.rs b/consensus/src/dag/tests/integration_tests.rs index a812800e213a2..ee92a0cd8e76b 100644 --- a/consensus/src/dag/tests/integration_tests.rs +++ b/consensus/src/dag/tests/integration_tests.rs @@ -8,7 +8,7 @@ use crate::{ network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC}, network_tests::{NetworkPlayground, TwinId}, test_utils::{ - consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStateComputer, MockStorage, + consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStorage, }, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; @@ -25,7 +25,6 @@ use aptos_network::{ transport::ConnectionMetadata, ProtocolId, }; -use aptos_storage_interface::mock::MockDbReaderWriter; use aptos_time_service::TimeService; use aptos_types::{ epoch_state::EpochState, @@ -35,7 +34,7 @@ use aptos_types::{ }; use claims::assert_gt; use futures::{ - stream::{select, AbortHandle, Select}, + stream::{select, Select}, StreamExt, }; use futures_channel::mpsc::UnboundedReceiver; From a29438b1c30f9c4c864684eb54e7bf1e01efbaa0 Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Mon, 11 Sep 2023 15:29:26 -0700 Subject: [PATCH 3/6] Return state sync notification instead of channel --- consensus/src/dag/bootstrap.rs | 30 +++--------- consensus/src/dag/dag_driver.rs | 5 +- consensus/src/dag/dag_handler.rs | 49 +++++++++++++------ consensus/src/dag/dag_state_sync.rs | 25 +++++----- .../src/dag/tests/dag_state_sync_tests.rs | 5 +- consensus/src/dag/tests/integration_tests.rs | 8 ++- 6 files changed, 58 insertions(+), 64 deletions(-) diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 48abb90e69d97..6c3b984f9110e 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -12,7 +12,7 @@ use super::{ order_rule::OrderRule, rb_handler::NodeBroadcastHandler, storage::DAGStorage, - types::DAGMessage, + types::{CertifiedNodeMessage, DAGMessage}, }; use crate::{ dag::{adapter::NotifierAdapter, dag_fetcher::DagFetcher}, @@ -195,18 +195,10 @@ impl DagBootstrapper { let mut shutdown_rx = shutdown_rx.into_stream(); loop { - let (rebootstrap_notification_tx, mut rebootstrap_notification_rx) = - tokio::sync::mpsc::channel(1); - - // TODO: fix let (dag_store, order_rule) = self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone()); - let state_sync_trigger = StateSyncTrigger::new( - dag_store.clone(), - adapter.clone(), - rebootstrap_notification_tx, - ); + let state_sync_trigger = StateSyncTrigger::new(dag_store.clone(), adapter.clone()); let (handler, fetch_service) = self.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); @@ -221,17 +213,16 @@ impl DagBootstrapper { let _ = df_handle.await; return Ok(()); }, - Some(node) = rebootstrap_notification_rx.recv() => { + certified_node_msg = handler.run(&mut dag_rpc_rx) => { df_handle.abort(); let _ = df_handle.await; let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone()); - if let Err(e) = sync_manager.sync_dag_to(&node, dag_fetcher, dag_store.clone()).await { + if let Err(e) = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone()).await { error!(error = ?e, "unable to sync"); } - }, - _ = handler.start(&mut dag_rpc_rx) => {} + } } } } @@ -249,7 +240,7 @@ pub(super) fn bootstrap_dag_for_test( payload_client: Arc, state_computer: Arc, ) -> ( - JoinHandle<()>, + JoinHandle, JoinHandle<()>, aptos_channel::Sender, UnboundedReceiver, @@ -269,21 +260,16 @@ pub(super) fn bootstrap_dag_for_test( let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone())); let (dag_rpc_tx, mut dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); - let (rebootstrap_notification_tx, _rebootstrap_notification_rx) = tokio::sync::mpsc::channel(1); let (dag_store, order_rule) = bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone()); - let state_sync_trigger = StateSyncTrigger::new( - dag_store.clone(), - adapter.clone(), - rebootstrap_notification_tx, - ); + let state_sync_trigger = StateSyncTrigger::new(dag_store.clone(), adapter.clone()); let (handler, fetch_service) = bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); - let dh_handle = tokio::spawn(async move { handler.start(&mut dag_rpc_rx).await }); + let dh_handle = tokio::spawn(async move { handler.run(&mut dag_rpc_rx).await }); let df_handle = tokio::spawn(fetch_service.start()); (dh_handle, df_handle, dag_rpc_tx, ordered_nodes_rx) diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 8aa498f283ef8..7f8c4f535335b 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -22,10 +22,7 @@ use aptos_infallible::RwLock; use aptos_logger::error; use aptos_reliable_broadcast::ReliableBroadcast; use aptos_time_service::{TimeService, TimeServiceTrait}; -use aptos_types::{ - block_info::{Round}, - epoch_state::EpochState, -}; +use aptos_types::{block_info::Round, epoch_state::EpochState}; use futures::{ future::{AbortHandle, Abortable}, FutureExt, diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index ddb4620554188..d4405f813e137 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -3,8 +3,11 @@ use super::{ dag_driver::DagDriver, dag_fetcher::{FetchRequestHandler, FetchWaiter}, - dag_state_sync::StateSyncTrigger, - types::TDAGMessage, + dag_state_sync::{ + StateSyncStatus::{self, NeedsSync, Synced}, + StateSyncTrigger, + }, + types::{CertifiedNodeMessage, TDAGMessage}, CertifiedNode, Node, }; use crate::{ @@ -53,16 +56,23 @@ impl NetworkHandler { } } - pub async fn start( + pub async fn run( mut self, dag_rpc_rx: &mut aptos_channel::Receiver, - ) { + ) -> CertifiedNodeMessage { // TODO(ibalajiarun): clean up Reliable Broadcast storage periodically. loop { select! { Some(msg) = dag_rpc_rx.next() => { - if let Err(e) = self.process_rpc(msg).await { - warn!(error = ?e, "error processing rpc"); + match self.process_rpc(msg).await { + Ok(sync_status) => { + if let StateSyncStatus::NeedsSync(certified_node_msg) = sync_status { + return certified_node_msg; + } + }, + Err(e) => { + warn!(error = ?e, "error processing rpc"); + } } }, Some(res) = self.node_fetch_waiter.next() => { @@ -79,7 +89,10 @@ impl NetworkHandler { } } - async fn process_rpc(&mut self, rpc_request: IncomingDAGRequest) -> anyhow::Result<()> { + async fn process_rpc( + &mut self, + rpc_request: IncomingDAGRequest, + ) -> anyhow::Result { let dag_message: DAGMessage = rpc_request.req.try_into()?; let author = dag_message @@ -94,14 +107,18 @@ impl NetworkHandler { .verify(&self.epoch_state.verifier) .and_then(|_| self.node_receiver.process(node)) .map(|r| r.into()), - DAGMessage::CertifiedNodeMsg(node) => match node.verify(&self.epoch_state.verifier) { - Ok(_) => { - let node = self.state_sync_trigger.check(node).await - self.dag_driver - .process(node.certified_node()) - .map(|r| r.into()) - }, - Err(e) => Err(e), + DAGMessage::CertifiedNodeMsg(certified_node_msg) => { + match certified_node_msg.verify(&self.epoch_state.verifier) { + Ok(_) => match self.state_sync_trigger.check(certified_node_msg).await { + ret @ (NeedsSync(_), None) => return Ok(ret.0), + (Synced, Some(certified_node_msg)) => self + .dag_driver + .process(certified_node_msg.certified_node()) + .map(|r| r.into()), + _ => unreachable!(), + }, + Err(e) => Err(e), + } }, DAGMessage::FetchRequest(request) => request .verify(&self.epoch_state.verifier) @@ -126,6 +143,6 @@ impl NetworkHandler { .response_sender .send(response) .map_err(|_| anyhow::anyhow!("unable to respond to rpc")) - .into() + .map(|_| StateSyncStatus::Synced) } } diff --git a/consensus/src/dag/dag_state_sync.rs b/consensus/src/dag/dag_state_sync.rs index 554c9ef5e117b..32af4532e2c3a 100644 --- a/consensus/src/dag/dag_state_sync.rs +++ b/consensus/src/dag/dag_state_sync.rs @@ -23,40 +23,39 @@ use std::sync::Arc; pub const DAG_WINDOW: usize = 1; pub const STATE_SYNC_WINDOW_MULTIPLIER: usize = 30; +pub(super) enum StateSyncStatus { + NeedsSync(CertifiedNodeMessage), + Synced, +} + pub(super) struct StateSyncTrigger { dag_store: Arc>, downstream_notifier: Arc, - bootstrap_notifier: tokio::sync::mpsc::Sender, } impl StateSyncTrigger { - pub(super) fn new( - dag_store: Arc>, - downstream_notifier: Arc, - bootstrap_notifier: tokio::sync::mpsc::Sender, - ) -> Self { + pub(super) fn new(dag_store: Arc>, downstream_notifier: Arc) -> Self { Self { dag_store, downstream_notifier, - bootstrap_notifier, } } /// This method checks if a state sync is required, and if so, /// notifies the bootstraper and yields the current task infinitely, /// to let the bootstraper can abort this task. - pub(super) async fn check(&self, node: CertifiedNodeMessage) -> anyhow::Result { + pub(super) async fn check( + &self, + node: CertifiedNodeMessage, + ) -> (StateSyncStatus, Option) { let ledger_info = node.ledger_info(); self.notify_commit_proof(ledger_info).await; if self.need_sync_for_ledger_info(ledger_info) { - self.bootstrap_notifier.send(node).await?; - loop { - tokio::task::yield_now().await; - } + return (StateSyncStatus::NeedsSync(node), None); } - Ok(node) + (StateSyncStatus::Synced, Some(node)) } /// Fast forward in the decoupled-execution pipeline if the block exists there diff --git a/consensus/src/dag/tests/dag_state_sync_tests.rs b/consensus/src/dag/tests/dag_state_sync_tests.rs index 24317bc5ce59a..e47d654ec1b3a 100644 --- a/consensus/src/dag/tests/dag_state_sync_tests.rs +++ b/consensus/src/dag/tests/dag_state_sync_tests.rs @@ -200,8 +200,5 @@ async fn test_dag_state_sync() { assert_eq!(new_dag.lowest_round(), (LI_ROUNDS - DAG_WINDOW) as Round); assert_eq!(new_dag.highest_round(), (NUM_ROUNDS - 1) as Round); assert_none!(new_dag.highest_ordered_anchor_round(),); - assert_eq!( - new_dag.highest_committed_anchor_round(), - LI_ROUNDS as Round - ); + assert_eq!(new_dag.highest_committed_anchor_round(), LI_ROUNDS as Round); } diff --git a/consensus/src/dag/tests/integration_tests.rs b/consensus/src/dag/tests/integration_tests.rs index ee92a0cd8e76b..ace3e4d0941b2 100644 --- a/consensus/src/dag/tests/integration_tests.rs +++ b/consensus/src/dag/tests/integration_tests.rs @@ -2,14 +2,12 @@ use super::dag_test; use crate::{ - dag::bootstrap::bootstrap_dag_for_test, + dag::{bootstrap::bootstrap_dag_for_test, types::CertifiedNodeMessage}, experimental::buffer_manager::OrderedBlocks, network::{DAGNetworkSenderImpl, IncomingDAGRequest, NetworkSender}, network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC}, network_tests::{NetworkPlayground, TwinId}, - test_utils::{ - consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStorage, - }, + test_utils::{consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStorage}, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::network_id::{NetworkId, PeerNetworkId}; @@ -43,7 +41,7 @@ use std::sync::Arc; use tokio::task::JoinHandle; struct DagBootstrapUnit { - nh_task_handle: JoinHandle<()>, + nh_task_handle: JoinHandle, df_task_handle: JoinHandle<()>, dag_rpc_tx: aptos_channel::Sender, network_events: From df5a89d12f9fd90f44024f87830273c4ce8a7878 Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Thu, 14 Sep 2023 13:36:42 -0700 Subject: [PATCH 4/6] feedback --- consensus/src/dag/bootstrap.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 6c3b984f9110e..fa148cfe85a17 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -16,7 +16,7 @@ use super::{ }; use crate::{ dag::{adapter::NotifierAdapter, dag_fetcher::DagFetcher}, - experimental::buffer_manager::OrderedBlocks, + experimental::{buffer_manager::OrderedBlocks, commit_reliable_broadcast::DropGuard}, network::IncomingDAGRequest, state_replication::{PayloadClient, StateComputer}, }; @@ -36,7 +36,10 @@ use aptos_types::{ ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, validator_signer::ValidatorSigner, }; -use futures::{FutureExt, StreamExt}; +use futures::{ + stream::{AbortHandle, Abortable}, + FutureExt, StreamExt, +}; use futures_channel::{ mpsc::{UnboundedReceiver, UnboundedSender}, oneshot, @@ -174,8 +177,8 @@ impl DagBootstrapper { self, mut dag_rpc_rx: Receiver, ordered_nodes_tx: UnboundedSender, - shutdown_rx: oneshot::Receiver<()>, - ) -> anyhow::Result<()> { + mut shutdown_rx: oneshot::Receiver<()>, + ) { let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, self.storage.clone())); let sync_manager = DagStateSynchronizer::new( @@ -192,8 +195,6 @@ impl DagBootstrapper { AggregateSignature::empty(), ); - let mut shutdown_rx = shutdown_rx.into_stream(); - loop { let (dag_store, order_rule) = self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone()); @@ -208,10 +209,10 @@ impl DagBootstrapper { // poll the network handler while waiting for rebootstrap notification or shutdown notification select! { biased; - _ = shutdown_rx.select_next_some() => { + _ = &mut shutdown_rx => { df_handle.abort(); let _ = df_handle.await; - return Ok(()); + return; }, certified_node_msg = handler.run(&mut dag_rpc_rx) => { df_handle.abort(); From d85cfd6bdf4b6e7167c9363ee27b48adf2b720f3 Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Thu, 14 Sep 2023 20:50:13 -0700 Subject: [PATCH 5/6] lint --- consensus/src/dag/bootstrap.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index fa148cfe85a17..605175a914ae3 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -16,7 +16,7 @@ use super::{ }; use crate::{ dag::{adapter::NotifierAdapter, dag_fetcher::DagFetcher}, - experimental::{buffer_manager::OrderedBlocks, commit_reliable_broadcast::DropGuard}, + experimental::buffer_manager::OrderedBlocks, network::IncomingDAGRequest, state_replication::{PayloadClient, StateComputer}, }; @@ -36,10 +36,6 @@ use aptos_types::{ ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, validator_signer::ValidatorSigner, }; -use futures::{ - stream::{AbortHandle, Abortable}, - FutureExt, StreamExt, -}; use futures_channel::{ mpsc::{UnboundedReceiver, UnboundedSender}, oneshot, From 3318585eae993e20a272c2f33de24bec49f75c65 Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Fri, 15 Sep 2023 08:54:11 -0700 Subject: [PATCH 6/6] rebase fix --- consensus/src/dag/bootstrap.rs | 6 +++--- consensus/src/dag/order_rule.rs | 2 +- consensus/src/dag/rb_handler.rs | 4 ++-- consensus/src/dag/tests/rb_handler_tests.rs | 4 ++++ 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 605175a914ae3..0e152a697258e 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -46,7 +46,7 @@ use tokio_retry::strategy::ExponentialBackoff; struct DagBootstrapper { self_peer: Author, - signer: ValidatorSigner, + signer: Arc, epoch_state: Arc, storage: Arc, rb_network_sender: Arc>, @@ -59,7 +59,7 @@ struct DagBootstrapper { impl DagBootstrapper { fn new( self_peer: Author, - signer: ValidatorSigner, + signer: Arc, epoch_state: Arc, storage: Arc, rb_network_sender: Arc>, @@ -244,7 +244,7 @@ pub(super) fn bootstrap_dag_for_test( ) { let bootstraper = DagBootstrapper::new( self_peer, - signer, + signer.into(), epoch_state, storage.clone(), rb_network_sender, diff --git a/consensus/src/dag/order_rule.rs b/consensus/src/dag/order_rule.rs index 479be085979ce..15478c9830da3 100644 --- a/consensus/src/dag/order_rule.rs +++ b/consensus/src/dag/order_rule.rs @@ -31,7 +31,7 @@ impl OrderRule { latest_ledger_info: LedgerInfo, dag: Arc>, mut anchor_election: Box, - notifier: Box, + notifier: Arc, storage: Arc, ) -> Self { let committed_round = if latest_ledger_info.ends_epoch() { diff --git a/consensus/src/dag/rb_handler.rs b/consensus/src/dag/rb_handler.rs index cf740f9db56be..19e839f2a9741 100644 --- a/consensus/src/dag/rb_handler.rs +++ b/consensus/src/dag/rb_handler.rs @@ -28,7 +28,7 @@ pub enum NodeBroadcastHandleError { pub(crate) struct NodeBroadcastHandler { dag: Arc>, votes_by_round_peer: BTreeMap>, - signer: ValidatorSigner, + signer: Arc, epoch_state: Arc, storage: Arc, fetch_requester: Arc, @@ -37,7 +37,7 @@ pub(crate) struct NodeBroadcastHandler { impl NodeBroadcastHandler { pub fn new( dag: Arc>, - signer: ValidatorSigner, + signer: Arc, epoch_state: Arc, storage: Arc, fetch_requester: Arc, diff --git a/consensus/src/dag/tests/rb_handler_tests.rs b/consensus/src/dag/tests/rb_handler_tests.rs index d49503c5d1f4f..f79b1cd2f471a 100644 --- a/consensus/src/dag/tests/rb_handler_tests.rs +++ b/consensus/src/dag/tests/rb_handler_tests.rs @@ -38,6 +38,7 @@ async fn test_node_broadcast_receiver_succeed() { epoch: 1, verifier: validator_verifier.clone(), }); + let signers: Vec<_> = signers.into_iter().map(Arc::new).collect(); // Scenario: Start DAG from beginning let storage = Arc::new(MockStorage::new()); @@ -80,6 +81,7 @@ async fn test_node_broadcast_receiver_failure() { epoch: 1, verifier: validator_verifier.clone(), }); + let signers: Vec<_> = signers.into_iter().map(Arc::new).collect(); let mut rb_receivers: Vec<_> = signers .iter() @@ -156,10 +158,12 @@ async fn test_node_broadcast_receiver_failure() { #[test] fn test_node_broadcast_receiver_storage() { let (signers, validator_verifier) = random_validator_verifier(4, None, false); + let signers: Vec<_> = signers.into_iter().map(Arc::new).collect(); let epoch_state = Arc::new(EpochState { epoch: 1, verifier: validator_verifier, }); + let storage = Arc::new(MockStorage::new()); let dag = Arc::new(RwLock::new(Dag::new( epoch_state.clone(),