diff --git a/consensus/src/dag/anchor_election.rs b/consensus/src/dag/anchor_election.rs index c04bbf6998048..ff7c45dc5a72b 100644 --- a/consensus/src/dag/anchor_election.rs +++ b/consensus/src/dag/anchor_election.rs @@ -3,7 +3,7 @@ use aptos_consensus_types::common::{Author, Round}; -pub trait AnchorElection { +pub trait AnchorElection: Send { fn get_anchor(&self, round: Round) -> Author; fn commit(&mut self, round: Round); diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs new file mode 100644 index 0000000000000..3d7c0289b8e84 --- /dev/null +++ b/consensus/src/dag/bootstrap.rs @@ -0,0 +1,119 @@ +// Copyright © Aptos Foundation + +use super::{ + anchor_election::RoundRobinAnchorElection, + dag_driver::DagDriver, + dag_fetcher::{DagFetcher, FetchRequestHandler}, + dag_handler::NetworkHandler, + dag_network::DAGNetworkSender, + dag_store::Dag, + order_rule::OrderRule, + rb_handler::NodeBroadcastHandler, + storage::DAGStorage, + types::DAGMessage, + CertifiedNode, +}; +use crate::{ + network::IncomingDAGRequest, state_replication::PayloadClient, util::time_service::TimeService, +}; +use aptos_channels::{aptos_channel, message_queues::QueueStyle}; +use aptos_consensus_types::common::Author; +use aptos_infallible::RwLock; +use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; +use aptos_types::{ + epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner, +}; +use futures::stream::{AbortHandle, Abortable}; +use std::sync::Arc; +use tokio_retry::strategy::ExponentialBackoff; + +pub fn bootstrap_dag( + self_peer: Author, + signer: ValidatorSigner, + epoch_state: Arc, + latest_ledger_info: LedgerInfo, + storage: Arc, + rb_network_sender: Arc>, + dag_network_sender: Arc, + time_service: aptos_time_service::TimeService, + payload_client: Arc, +) -> ( + AbortHandle, + AbortHandle, + aptos_channel::Sender, + futures_channel::mpsc::UnboundedReceiver>>, +) { + let validators = epoch_state.verifier.get_ordered_account_addresses(); + let current_round = latest_ledger_info.round(); + + let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); + let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); + + // A backoff policy that starts at 100ms and doubles each iteration. + let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50); + let rb = Arc::new(ReliableBroadcast::new( + validators.clone(), + rb_network_sender, + rb_backoff_policy, + time_service.clone(), + )); + + let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone()))); + + let anchor_election = Box::new(RoundRobinAnchorElection::new(validators)); + let order_rule = OrderRule::new( + epoch_state.clone(), + latest_ledger_info, + dag.clone(), + anchor_election, + ordered_nodes_tx, + ); + + let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = + DagFetcher::new( + epoch_state.clone(), + dag_network_sender, + dag.clone(), + time_service.clone(), + ); + let fetch_requester = Arc::new(fetch_requester); + + let dag_driver = DagDriver::new( + self_peer, + epoch_state.clone(), + dag.clone(), + payload_client, + rb, + current_round, + time_service, + storage.clone(), + order_rule, + fetch_requester, + ); + let rb_handler = + NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone()); + let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone()); + + let dag_handler = NetworkHandler::new( + epoch_state, + dag_rpc_rx, + rb_handler, + dag_driver, + fetch_handler, + node_fetch_waiter, + certified_node_fetch_waiter, + ); + + let (nh_abort_handle, nh_abort_registration) = AbortHandle::new_pair(); + let (df_abort_handle, df_abort_registration) = AbortHandle::new_pair(); + + tokio::spawn(Abortable::new(dag_handler.start(), nh_abort_registration)); + tokio::spawn(Abortable::new(dag_fetcher.start(), df_abort_registration)); + + ( + nh_abort_handle, + df_abort_handle, + dag_rpc_tx, + ordered_nodes_rx, + ) +} diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index 9e67ab0f63640..e1da8eb1a1b6e 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -2,101 +2,57 @@ use super::{ dag_driver::DagDriver, - dag_fetcher::{DagFetcher, FetchRequestHandler, FetchWaiter}, - dag_network::DAGNetworkSender, - order_rule::OrderRule, - storage::DAGStorage, + dag_fetcher::{FetchRequestHandler, FetchWaiter}, types::TDAGMessage, CertifiedNode, Node, }; use crate::{ - dag::{ - dag_network::RpcHandler, dag_store::Dag, rb_handler::NodeBroadcastHandler, - types::DAGMessage, - }, + dag::{dag_network::RpcHandler, rb_handler::NodeBroadcastHandler, types::DAGMessage}, network::{IncomingDAGRequest, TConsensusMsg}, - state_replication::PayloadClient, }; use anyhow::bail; use aptos_channels::aptos_channel; use aptos_consensus_types::common::Author; -use aptos_infallible::RwLock; use aptos_logger::{error, warn}; use aptos_network::protocols::network::RpcError; -use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; -use aptos_time_service::TimeService; -use aptos_types::{epoch_state::EpochState, validator_signer::ValidatorSigner}; +use aptos_types::epoch_state::EpochState; use bytes::Bytes; use futures::StreamExt; use std::sync::Arc; use tokio::select; -use tokio_retry::strategy::ExponentialBackoff; -struct NetworkHandler { +pub(crate) struct NetworkHandler { + epoch_state: Arc, dag_rpc_rx: aptos_channel::Receiver, node_receiver: NodeBroadcastHandler, dag_driver: DagDriver, fetch_receiver: FetchRequestHandler, - epoch_state: Arc, node_fetch_waiter: FetchWaiter, certified_node_fetch_waiter: FetchWaiter, } impl NetworkHandler { - fn new( - dag: Arc>, - dag_rpc_rx: aptos_channel::Receiver, - signer: ValidatorSigner, + pub fn new( epoch_state: Arc, - storage: Arc, - payload_client: Arc, - dag_network_sender: Arc, - rb_network_sender: Arc>, - time_service: TimeService, - order_rule: OrderRule, + dag_rpc_rx: aptos_channel::Receiver, + node_receiver: NodeBroadcastHandler, + dag_driver: DagDriver, + fetch_receiver: FetchRequestHandler, + node_fetch_waiter: FetchWaiter, + certified_node_fetch_waiter: FetchWaiter, ) -> Self { - let rb = Arc::new(ReliableBroadcast::new( - epoch_state.verifier.get_ordered_account_addresses().clone(), - rb_network_sender, - ExponentialBackoff::from_millis(10), - time_service.clone(), - )); - let (_dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = - DagFetcher::new( - epoch_state.clone(), - dag_network_sender, - dag.clone(), - time_service.clone(), - ); - let fetch_requester = Arc::new(fetch_requester); Self { + epoch_state, dag_rpc_rx, - node_receiver: NodeBroadcastHandler::new( - dag.clone(), - signer.clone(), - epoch_state.clone(), - storage.clone(), - ), - dag_driver: DagDriver::new( - signer.author(), - epoch_state.clone(), - dag.clone(), - payload_client, - rb, - 1, - time_service, - storage, - order_rule, - fetch_requester, - ), - epoch_state: epoch_state.clone(), - fetch_receiver: FetchRequestHandler::new(dag, epoch_state), + node_receiver, + dag_driver, + fetch_receiver, node_fetch_waiter, certified_node_fetch_waiter, } } - async fn start(mut self) { + pub async fn start(mut self) { self.dag_driver.try_enter_new_round(); // TODO(ibalajiarun): clean up Reliable Broadcast storage periodically. diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs index b05f59d86d778..7b96f4384e315 100644 --- a/consensus/src/dag/mod.rs +++ b/consensus/src/dag/mod.rs @@ -3,6 +3,7 @@ #![allow(dead_code)] mod anchor_election; +mod bootstrap; mod dag_driver; mod dag_fetcher; mod dag_handler; diff --git a/consensus/src/dag/rb_handler.rs b/consensus/src/dag/rb_handler.rs index 363218978777b..09bddf19623be 100644 --- a/consensus/src/dag/rb_handler.rs +++ b/consensus/src/dag/rb_handler.rs @@ -25,7 +25,7 @@ pub enum NodeBroadcastHandleError { NotEnoughParents, } -pub struct NodeBroadcastHandler { +pub(crate) struct NodeBroadcastHandler { dag: Arc>, votes_by_round_peer: BTreeMap>, signer: ValidatorSigner,