Skip to content

Commit

Permalink
[dag] bootstrap logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 2, 2023
1 parent b71af2f commit 5f62009
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 71 deletions.
2 changes: 1 addition & 1 deletion consensus/src/dag/anchor_election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use aptos_consensus_types::common::{Author, Round};

pub trait AnchorElection {
pub trait AnchorElection: Send {
fn get_anchor(&self, round: Round) -> Author;

fn commit(&mut self, round: Round);
Expand Down
117 changes: 117 additions & 0 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright © Aptos Foundation

use super::{
anchor_election::RoundRobinAnchorElection,
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, FetchRequestHandler},
dag_handler::NetworkHandler,
dag_network::DAGNetworkSender,
dag_store::Dag,
order_rule::OrderRule,
rb_handler::NodeBroadcastHandler,
storage::DAGStorage,
types::DAGMessage,
CertifiedNode,
};
use crate::{
network::IncomingDAGRequest, state_replication::PayloadClient, util::time_service::TimeService,
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{
epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner,
};
use futures::stream::{AbortHandle, Abortable};
use std::sync::Arc;
use tokio_retry::strategy::ExponentialBackoff;

pub fn bootstrap_dag(
self_peer: Author,
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
latest_ledger_info: LedgerInfo,
storage: Arc<dyn DAGStorage>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
dag_network_sender: Arc<dyn DAGNetworkSender>,
time_service: Arc<dyn TimeService>,
aptos_time_service: aptos_time_service::TimeService,
payload_client: Arc<dyn PayloadClient>,
) -> (
AbortHandle,
AbortHandle,
aptos_channel::Sender<Author, IncomingDAGRequest>,
futures_channel::mpsc::UnboundedReceiver<Vec<Arc<CertifiedNode>>>,
) {
let validators = epoch_state.verifier.get_ordered_account_addresses();
let current_round = latest_ledger_info.round();

let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);

// A backoff policy that starts at 100ms and doubles each iteration.
let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50);
let rb = Arc::new(ReliableBroadcast::new(
validators.clone(),
rb_network_sender,
rb_backoff_policy,
aptos_time_service.clone(),
));

let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone())));

let anchor_election = Box::new(RoundRobinAnchorElection::new(validators));
let order_rule = OrderRule::new(
epoch_state.clone(),
latest_ledger_info,
dag.clone(),
anchor_election,
ordered_nodes_tx,
);

let (dag_fetcher, fetch_requester) = DagFetcher::new(
epoch_state.clone(),
dag_network_sender,
dag.clone(),
aptos_time_service,
);
let fetch_requester = Arc::new(fetch_requester);

let dag_driver = DagDriver::new(
self_peer,
epoch_state.clone(),
dag.clone(),
payload_client,
rb,
current_round,
time_service,
storage.clone(),
order_rule,
fetch_requester,
);
let rb_handler =
NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone());
let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone());

let dag_handler = NetworkHandler::new(
epoch_state,
dag_rpc_rx,
rb_handler,
dag_driver,
fetch_handler,
);

let (nh_abort_handle, nh_abort_registration) = AbortHandle::new_pair();
let (df_abort_handle, df_abort_registration) = AbortHandle::new_pair();

tokio::spawn(Abortable::new(dag_handler.start(), nh_abort_registration));
tokio::spawn(Abortable::new(dag_fetcher.start(), df_abort_registration));

(
nh_abort_handle,
df_abort_handle,
dag_rpc_tx,
ordered_nodes_rx,
)
}
5 changes: 4 additions & 1 deletion consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ use aptos_logger::error;
use aptos_time_service::TimeService;
use aptos_types::epoch_state::EpochState;
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::sync::{oneshot, mpsc::{Sender, Receiver}};
use std::{collections::HashMap, sync::Arc, time::Duration};
use thiserror::Error as ThisError;
use tokio::sync::{
mpsc::{Receiver, Sender},
oneshot,
};

pub struct FetchRequester {
request_tx: Sender<LocalFetchRequest>,
Expand Down
79 changes: 15 additions & 64 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
@@ -1,95 +1,46 @@
// Copyright © Aptos Foundation

use super::{
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, FetchRequestHandler},
dag_network::DAGNetworkSender,
order_rule::OrderRule,
storage::DAGStorage,
types::TDAGMessage,
};
use super::{dag_driver::DagDriver, dag_fetcher::FetchRequestHandler, types::TDAGMessage};
use crate::{
dag::{
dag_network::RpcHandler, dag_store::Dag, rb_handler::NodeBroadcastHandler,
types::DAGMessage,
},
dag::{dag_network::RpcHandler, rb_handler::NodeBroadcastHandler, types::DAGMessage},
network::{IncomingDAGRequest, TConsensusMsg},
state_replication::PayloadClient,
util::time_service::TimeService,
};
use anyhow::bail;
use aptos_channels::aptos_channel;
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_logger::{error, warn};
use aptos_network::protocols::network::RpcError;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{epoch_state::EpochState, validator_signer::ValidatorSigner};
use aptos_types::epoch_state::EpochState;
use bytes::Bytes;
use futures::StreamExt;
use std::sync::Arc;
use tokio_retry::strategy::ExponentialBackoff;

struct NetworkHandler {
pub(crate) struct NetworkHandler {
epoch_state: Arc<EpochState>,
dag_rpc_rx: aptos_channel::Receiver<Author, IncomingDAGRequest>,
node_receiver: NodeBroadcastHandler,
dag_driver: DagDriver,
fetch_receiver: FetchRequestHandler,
epoch_state: Arc<EpochState>,
}

impl NetworkHandler {
fn new(
dag: Arc<RwLock<Dag>>,
dag_rpc_rx: aptos_channel::Receiver<Author, IncomingDAGRequest>,
signer: ValidatorSigner,
pub fn new(
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
payload_client: Arc<dyn PayloadClient>,
dag_network_sender: Arc<dyn DAGNetworkSender>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
time_service: Arc<dyn TimeService>,
aptos_time_service: aptos_time_service::TimeService,
order_rule: OrderRule,
dag_rpc_rx: aptos_channel::Receiver<Author, IncomingDAGRequest>,
node_receiver: NodeBroadcastHandler,
dag_driver: DagDriver,
fetch_receiver: FetchRequestHandler,
) -> Self {
let rb = Arc::new(ReliableBroadcast::new(
epoch_state.verifier.get_ordered_account_addresses().clone(),
rb_network_sender,
ExponentialBackoff::from_millis(10),
aptos_time_service.clone(),
));
let (dag_fetcher, fetch_requester) = DagFetcher::new(
epoch_state.clone(),
dag_network_sender,
dag.clone(),
aptos_time_service,
);
Self {
epoch_state,
dag_rpc_rx,
node_receiver: NodeBroadcastHandler::new(
dag.clone(),
signer.clone(),
epoch_state.clone(),
storage.clone(),
),
dag_driver: DagDriver::new(
signer.author(),
epoch_state.clone(),
dag.clone(),
payload_client,
rb,
1,
time_service,
storage,
order_rule,
Arc::new(fetch_requester),
),
epoch_state: epoch_state.clone(),
fetch_receiver: FetchRequestHandler::new(dag, epoch_state),
node_receiver,
dag_driver,
fetch_receiver,
}
}

async fn start(mut self) {
pub async fn start(mut self) {
self.dag_driver.try_enter_new_round();

// TODO(ibalajiarun): clean up Reliable Broadcast storage periodically.
Expand Down
1 change: 1 addition & 0 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![allow(dead_code)]

mod anchor_election;
mod bootstrap;
mod dag_driver;
mod dag_fetcher;
mod dag_handler;
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum NodeBroadcastHandleError {
NotEnoughParents,
}

pub struct NodeBroadcastHandler {
pub(crate) struct NodeBroadcastHandler {
dag: Arc<RwLock<Dag>>,
votes_by_round_peer: BTreeMap<Round, BTreeMap<Author, Vote>>,
signer: ValidatorSigner,
Expand Down
18 changes: 14 additions & 4 deletions consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@

use crate::{
dag::{
anchor_election::RoundRobinAnchorElection,
dag_driver::{DagDriver, DagDriverError},
dag_fetcher::FetchRequester,
dag_network::{DAGNetworkSender, RpcWithFallback},
dag_store::Dag,
order_rule::OrderRule,
tests::{dag_test::MockStorage, helpers::new_certified_node},
types::{CertifiedAck, DAGMessage},
RpcHandler, order_rule::OrderRule,
anchor_election::RoundRobinAnchorElection, dag_fetcher::FetchRequester,
RpcHandler,
},
test_utils::MockPayloadManager,
util::mock_time_service::SimulatedTimeService,
};
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{epoch_state::EpochState, validator_verifier::random_validator_verifier, ledger_info::LedgerInfo};
use aptos_types::{
epoch_state::EpochState, ledger_info::LedgerInfo, validator_verifier::random_validator_verifier,
};
use async_trait::async_trait;
use claims::{assert_ok, assert_ok_eq};
use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -81,7 +85,13 @@ fn test_certified_node_handler() {
let time_service = Arc::new(SimulatedTimeService::new());
let (ordered_nodes_sender, _) = futures_channel::mpsc::unbounded();
let validators = signers.iter().map(|vs| vs.author()).collect();
let order_rule = OrderRule::new(epoch_state.clone(), LedgerInfo::mock_genesis(None), dag.clone(), Box::new(RoundRobinAnchorElection::new(validators)), ordered_nodes_sender);
let order_rule = OrderRule::new(
epoch_state.clone(),
LedgerInfo::mock_genesis(None),
dag.clone(),
Box::new(RoundRobinAnchorElection::new(validators)),
ordered_nodes_sender,
);

let (request_tx, _) = tokio::sync::mpsc::channel(10);
let fetch_requester = Arc::new(FetchRequester::new(request_tx));
Expand Down

0 comments on commit 5f62009

Please sign in to comment.