Skip to content

Commit

Permalink
[dag] bootstrap logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 11, 2023
1 parent da45aca commit 632fc39
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 63 deletions.
2 changes: 1 addition & 1 deletion consensus/src/dag/anchor_election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use aptos_consensus_types::common::{Author, Round};

pub trait AnchorElection {
pub trait AnchorElection: Send {
fn get_anchor(&self, round: Round) -> Author;

fn commit(&mut self, round: Round);
Expand Down
119 changes: 119 additions & 0 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright © Aptos Foundation

use super::{
anchor_election::RoundRobinAnchorElection,
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, FetchRequestHandler},
dag_handler::NetworkHandler,
dag_network::DAGNetworkSender,
dag_store::Dag,
order_rule::OrderRule,
rb_handler::NodeBroadcastHandler,
storage::DAGStorage,
types::DAGMessage,
CertifiedNode,
};
use crate::{
network::IncomingDAGRequest, state_replication::PayloadClient, util::time_service::TimeService,
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{
epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner,
};
use futures::stream::{AbortHandle, Abortable};
use std::sync::Arc;
use tokio_retry::strategy::ExponentialBackoff;

pub fn bootstrap_dag(
self_peer: Author,
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
latest_ledger_info: LedgerInfo,
storage: Arc<dyn DAGStorage>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
dag_network_sender: Arc<dyn DAGNetworkSender>,
time_service: aptos_time_service::TimeService,
payload_client: Arc<dyn PayloadClient>,
) -> (
AbortHandle,
AbortHandle,
aptos_channel::Sender<Author, IncomingDAGRequest>,
futures_channel::mpsc::UnboundedReceiver<Vec<Arc<CertifiedNode>>>,
) {
let validators = epoch_state.verifier.get_ordered_account_addresses();
let current_round = latest_ledger_info.round();

let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);

// A backoff policy that starts at 100ms and doubles each iteration.
let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50);
let rb = Arc::new(ReliableBroadcast::new(
validators.clone(),
rb_network_sender,
rb_backoff_policy,
time_service.clone(),
));

let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone())));

let anchor_election = Box::new(RoundRobinAnchorElection::new(validators));
let order_rule = OrderRule::new(
epoch_state.clone(),
latest_ledger_info,
dag.clone(),
anchor_election,
ordered_nodes_tx,
);

let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) =
DagFetcher::new(
epoch_state.clone(),
dag_network_sender,
dag.clone(),
time_service.clone(),
);
let fetch_requester = Arc::new(fetch_requester);

let dag_driver = DagDriver::new(
self_peer,
epoch_state.clone(),
dag.clone(),
payload_client,
rb,
current_round,
time_service,
storage.clone(),
order_rule,
fetch_requester,
);
let rb_handler =
NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone());
let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone());

let dag_handler = NetworkHandler::new(
epoch_state,
dag_rpc_rx,
rb_handler,
dag_driver,
fetch_handler,
node_fetch_waiter,
certified_node_fetch_waiter,
);

let (nh_abort_handle, nh_abort_registration) = AbortHandle::new_pair();
let (df_abort_handle, df_abort_registration) = AbortHandle::new_pair();

tokio::spawn(Abortable::new(dag_handler.start(), nh_abort_registration));
tokio::spawn(Abortable::new(dag_fetcher.start(), df_abort_registration));

(
nh_abort_handle,
df_abort_handle,
dag_rpc_tx,
ordered_nodes_rx,
)
}
78 changes: 17 additions & 61 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,101 +2,57 @@

use super::{
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, FetchRequestHandler, FetchWaiter},
dag_network::DAGNetworkSender,
order_rule::OrderRule,
storage::DAGStorage,
dag_fetcher::{FetchRequestHandler, FetchWaiter},
types::TDAGMessage,
CertifiedNode, Node,
};
use crate::{
dag::{
dag_network::RpcHandler, dag_store::Dag, rb_handler::NodeBroadcastHandler,
types::DAGMessage,
},
dag::{dag_network::RpcHandler, rb_handler::NodeBroadcastHandler, types::DAGMessage},
network::{IncomingDAGRequest, TConsensusMsg},
state_replication::PayloadClient,
};
use anyhow::bail;
use aptos_channels::aptos_channel;
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_logger::{error, warn};
use aptos_network::protocols::network::RpcError;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_time_service::TimeService;
use aptos_types::{epoch_state::EpochState, validator_signer::ValidatorSigner};
use aptos_types::epoch_state::EpochState;
use bytes::Bytes;
use futures::StreamExt;
use std::sync::Arc;
use tokio::select;
use tokio_retry::strategy::ExponentialBackoff;

struct NetworkHandler {
pub(crate) struct NetworkHandler {
epoch_state: Arc<EpochState>,
dag_rpc_rx: aptos_channel::Receiver<Author, IncomingDAGRequest>,
node_receiver: NodeBroadcastHandler,
dag_driver: DagDriver,
fetch_receiver: FetchRequestHandler,
epoch_state: Arc<EpochState>,
node_fetch_waiter: FetchWaiter<Node>,
certified_node_fetch_waiter: FetchWaiter<CertifiedNode>,
}

impl NetworkHandler {
fn new(
dag: Arc<RwLock<Dag>>,
dag_rpc_rx: aptos_channel::Receiver<Author, IncomingDAGRequest>,
signer: ValidatorSigner,
pub fn new(
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
payload_client: Arc<dyn PayloadClient>,
dag_network_sender: Arc<dyn DAGNetworkSender>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
time_service: TimeService,
order_rule: OrderRule,
dag_rpc_rx: aptos_channel::Receiver<Author, IncomingDAGRequest>,
node_receiver: NodeBroadcastHandler,
dag_driver: DagDriver,
fetch_receiver: FetchRequestHandler,
node_fetch_waiter: FetchWaiter<Node>,
certified_node_fetch_waiter: FetchWaiter<CertifiedNode>,
) -> Self {
let rb = Arc::new(ReliableBroadcast::new(
epoch_state.verifier.get_ordered_account_addresses().clone(),
rb_network_sender,
ExponentialBackoff::from_millis(10),
time_service.clone(),
));
let (_dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) =
DagFetcher::new(
epoch_state.clone(),
dag_network_sender,
dag.clone(),
time_service.clone(),
);
let fetch_requester = Arc::new(fetch_requester);
Self {
epoch_state,
dag_rpc_rx,
node_receiver: NodeBroadcastHandler::new(
dag.clone(),
signer.clone(),
epoch_state.clone(),
storage.clone(),
),
dag_driver: DagDriver::new(
signer.author(),
epoch_state.clone(),
dag.clone(),
payload_client,
rb,
1,
time_service,
storage,
order_rule,
fetch_requester,
),
epoch_state: epoch_state.clone(),
fetch_receiver: FetchRequestHandler::new(dag, epoch_state),
node_receiver,
dag_driver,
fetch_receiver,
node_fetch_waiter,
certified_node_fetch_waiter,
}
}

async fn start(mut self) {
pub async fn start(mut self) {
self.dag_driver.try_enter_new_round();

// TODO(ibalajiarun): clean up Reliable Broadcast storage periodically.
Expand Down
1 change: 1 addition & 0 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![allow(dead_code)]

mod anchor_election;
mod bootstrap;
mod dag_driver;
mod dag_fetcher;
mod dag_handler;
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum NodeBroadcastHandleError {
NotEnoughParents,
}

pub struct NodeBroadcastHandler {
pub(crate) struct NodeBroadcastHandler {
dag: Arc<RwLock<Dag>>,
votes_by_round_peer: BTreeMap<Round, BTreeMap<Author, Vote>>,
signer: ValidatorSigner,
Expand Down

0 comments on commit 632fc39

Please sign in to comment.