diff --git a/consensus/src/dag/anchor_election.rs b/consensus/src/dag/anchor_election.rs index c04bbf6998048..ff7c45dc5a72b 100644 --- a/consensus/src/dag/anchor_election.rs +++ b/consensus/src/dag/anchor_election.rs @@ -3,7 +3,7 @@ use aptos_consensus_types::common::{Author, Round}; -pub trait AnchorElection { +pub trait AnchorElection: Send { fn get_anchor(&self, round: Round) -> Author; fn commit(&mut self, round: Round); diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs new file mode 100644 index 0000000000000..20d63e708a84c --- /dev/null +++ b/consensus/src/dag/bootstrap.rs @@ -0,0 +1,117 @@ +// Copyright © Aptos Foundation + +use super::{ + anchor_election::RoundRobinAnchorElection, + dag_driver::DagDriver, + dag_fetcher::{DagFetcher, FetchRequestHandler}, + dag_handler::NetworkHandler, + dag_network::DAGNetworkSender, + dag_store::Dag, + order_rule::OrderRule, + rb_handler::NodeBroadcastHandler, + storage::DAGStorage, + types::DAGMessage, + CertifiedNode, +}; +use crate::{ + network::IncomingDAGRequest, state_replication::PayloadClient, util::time_service::TimeService, +}; +use aptos_channels::{aptos_channel, message_queues::QueueStyle}; +use aptos_consensus_types::common::Author; +use aptos_infallible::RwLock; +use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; +use aptos_types::{ + epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner, +}; +use futures::stream::{AbortHandle, Abortable}; +use std::sync::Arc; +use tokio_retry::strategy::ExponentialBackoff; + +pub fn bootstrap_dag( + self_peer: Author, + signer: ValidatorSigner, + epoch_state: Arc, + latest_ledger_info: LedgerInfo, + storage: Arc, + rb_network_sender: Arc>, + dag_network_sender: Arc, + time_service: Arc, + aptos_time_service: aptos_time_service::TimeService, + payload_client: Arc, +) -> ( + AbortHandle, + AbortHandle, + aptos_channel::Sender, + futures_channel::mpsc::UnboundedReceiver>>, +) { + let validators = epoch_state.verifier.get_ordered_account_addresses(); + let current_round = latest_ledger_info.round(); + + let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); + let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); + + // A backoff policy that starts at 100ms and doubles each iteration. + let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50); + let rb = Arc::new(ReliableBroadcast::new( + validators.clone(), + rb_network_sender, + rb_backoff_policy, + aptos_time_service.clone(), + )); + + let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone()))); + + let anchor_election = Box::new(RoundRobinAnchorElection::new(validators)); + let order_rule = OrderRule::new( + epoch_state.clone(), + latest_ledger_info, + dag.clone(), + anchor_election, + ordered_nodes_tx, + ); + + let (dag_fetcher, fetch_requester) = DagFetcher::new( + epoch_state.clone(), + dag_network_sender, + dag.clone(), + aptos_time_service, + ); + let fetch_requester = Arc::new(fetch_requester); + + let dag_driver = DagDriver::new( + self_peer, + epoch_state.clone(), + dag.clone(), + payload_client, + rb, + current_round, + time_service, + storage.clone(), + order_rule, + fetch_requester, + ); + let rb_handler = + NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone()); + let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone()); + + let dag_handler = NetworkHandler::new( + epoch_state, + dag_rpc_rx, + rb_handler, + dag_driver, + fetch_handler, + ); + + let (nh_abort_handle, nh_abort_registration) = AbortHandle::new_pair(); + let (df_abort_handle, df_abort_registration) = AbortHandle::new_pair(); + + tokio::spawn(Abortable::new(dag_handler.start(), nh_abort_registration)); + tokio::spawn(Abortable::new(dag_fetcher.start(), df_abort_registration)); + + ( + nh_abort_handle, + df_abort_handle, + dag_rpc_tx, + ordered_nodes_rx, + ) +} diff --git a/consensus/src/dag/dag_fetcher.rs b/consensus/src/dag/dag_fetcher.rs index db3da1f9630cf..9c0d31f2eb61a 100644 --- a/consensus/src/dag/dag_fetcher.rs +++ b/consensus/src/dag/dag_fetcher.rs @@ -14,9 +14,12 @@ use aptos_logger::error; use aptos_time_service::TimeService; use aptos_types::epoch_state::EpochState; use futures::{stream::FuturesUnordered, StreamExt}; -use tokio::sync::{oneshot, mpsc::{Sender, Receiver}}; use std::{collections::HashMap, sync::Arc, time::Duration}; use thiserror::Error as ThisError; +use tokio::sync::{ + mpsc::{Receiver, Sender}, + oneshot, +}; pub struct FetchRequester { request_tx: Sender, diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index af65806e00b1b..6c831b995dcb7 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -1,95 +1,46 @@ // Copyright © Aptos Foundation -use super::{ - dag_driver::DagDriver, - dag_fetcher::{DagFetcher, FetchRequestHandler}, - dag_network::DAGNetworkSender, - order_rule::OrderRule, - storage::DAGStorage, - types::TDAGMessage, -}; +use super::{dag_driver::DagDriver, dag_fetcher::FetchRequestHandler, types::TDAGMessage}; use crate::{ - dag::{ - dag_network::RpcHandler, dag_store::Dag, rb_handler::NodeBroadcastHandler, - types::DAGMessage, - }, + dag::{dag_network::RpcHandler, rb_handler::NodeBroadcastHandler, types::DAGMessage}, network::{IncomingDAGRequest, TConsensusMsg}, - state_replication::PayloadClient, - util::time_service::TimeService, }; use anyhow::bail; use aptos_channels::aptos_channel; use aptos_consensus_types::common::Author; -use aptos_infallible::RwLock; use aptos_logger::{error, warn}; use aptos_network::protocols::network::RpcError; -use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; -use aptos_types::{epoch_state::EpochState, validator_signer::ValidatorSigner}; +use aptos_types::epoch_state::EpochState; use bytes::Bytes; use futures::StreamExt; use std::sync::Arc; -use tokio_retry::strategy::ExponentialBackoff; -struct NetworkHandler { +pub(crate) struct NetworkHandler { + epoch_state: Arc, dag_rpc_rx: aptos_channel::Receiver, node_receiver: NodeBroadcastHandler, dag_driver: DagDriver, fetch_receiver: FetchRequestHandler, - epoch_state: Arc, } impl NetworkHandler { - fn new( - dag: Arc>, - dag_rpc_rx: aptos_channel::Receiver, - signer: ValidatorSigner, + pub fn new( epoch_state: Arc, - storage: Arc, - payload_client: Arc, - dag_network_sender: Arc, - rb_network_sender: Arc>, - time_service: Arc, - aptos_time_service: aptos_time_service::TimeService, - order_rule: OrderRule, + dag_rpc_rx: aptos_channel::Receiver, + node_receiver: NodeBroadcastHandler, + dag_driver: DagDriver, + fetch_receiver: FetchRequestHandler, ) -> Self { - let rb = Arc::new(ReliableBroadcast::new( - epoch_state.verifier.get_ordered_account_addresses().clone(), - rb_network_sender, - ExponentialBackoff::from_millis(10), - aptos_time_service.clone(), - )); - let (dag_fetcher, fetch_requester) = DagFetcher::new( - epoch_state.clone(), - dag_network_sender, - dag.clone(), - aptos_time_service, - ); Self { + epoch_state, dag_rpc_rx, - node_receiver: NodeBroadcastHandler::new( - dag.clone(), - signer.clone(), - epoch_state.clone(), - storage.clone(), - ), - dag_driver: DagDriver::new( - signer.author(), - epoch_state.clone(), - dag.clone(), - payload_client, - rb, - 1, - time_service, - storage, - order_rule, - Arc::new(fetch_requester), - ), - epoch_state: epoch_state.clone(), - fetch_receiver: FetchRequestHandler::new(dag, epoch_state), + node_receiver, + dag_driver, + fetch_receiver, } } - async fn start(mut self) { + pub async fn start(mut self) { self.dag_driver.try_enter_new_round(); // TODO(ibalajiarun): clean up Reliable Broadcast storage periodically. diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs index b05f59d86d778..7b96f4384e315 100644 --- a/consensus/src/dag/mod.rs +++ b/consensus/src/dag/mod.rs @@ -3,6 +3,7 @@ #![allow(dead_code)] mod anchor_election; +mod bootstrap; mod dag_driver; mod dag_fetcher; mod dag_handler; diff --git a/consensus/src/dag/rb_handler.rs b/consensus/src/dag/rb_handler.rs index 363218978777b..09bddf19623be 100644 --- a/consensus/src/dag/rb_handler.rs +++ b/consensus/src/dag/rb_handler.rs @@ -25,7 +25,7 @@ pub enum NodeBroadcastHandleError { NotEnoughParents, } -pub struct NodeBroadcastHandler { +pub(crate) struct NodeBroadcastHandler { dag: Arc>, votes_by_round_peer: BTreeMap>, signer: ValidatorSigner, diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index df37ae10d9705..641e27ecf5287 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -2,13 +2,15 @@ use crate::{ dag::{ + anchor_election::RoundRobinAnchorElection, dag_driver::{DagDriver, DagDriverError}, + dag_fetcher::FetchRequester, dag_network::{DAGNetworkSender, RpcWithFallback}, dag_store::Dag, + order_rule::OrderRule, tests::{dag_test::MockStorage, helpers::new_certified_node}, types::{CertifiedAck, DAGMessage}, - RpcHandler, order_rule::OrderRule, - anchor_election::RoundRobinAnchorElection, dag_fetcher::FetchRequester, + RpcHandler, }, test_utils::MockPayloadManager, util::mock_time_service::SimulatedTimeService, @@ -16,7 +18,9 @@ use crate::{ use aptos_consensus_types::common::Author; use aptos_infallible::RwLock; use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; -use aptos_types::{epoch_state::EpochState, validator_verifier::random_validator_verifier, ledger_info::LedgerInfo}; +use aptos_types::{ + epoch_state::EpochState, ledger_info::LedgerInfo, validator_verifier::random_validator_verifier, +}; use async_trait::async_trait; use claims::{assert_ok, assert_ok_eq}; use std::{sync::Arc, time::Duration}; @@ -81,7 +85,13 @@ fn test_certified_node_handler() { let time_service = Arc::new(SimulatedTimeService::new()); let (ordered_nodes_sender, _) = futures_channel::mpsc::unbounded(); let validators = signers.iter().map(|vs| vs.author()).collect(); - let order_rule = OrderRule::new(epoch_state.clone(), LedgerInfo::mock_genesis(None), dag.clone(), Box::new(RoundRobinAnchorElection::new(validators)), ordered_nodes_sender); + let order_rule = OrderRule::new( + epoch_state.clone(), + LedgerInfo::mock_genesis(None), + dag.clone(), + Box::new(RoundRobinAnchorElection::new(validators)), + ordered_nodes_sender, + ); let (request_tx, _) = tokio::sync::mpsc::channel(10); let fetch_requester = Arc::new(FetchRequester::new(request_tx));