Skip to content

Commit

Permalink
[dag] dag rebootstrap (#9967)
Browse files Browse the repository at this point in the history
Rebootstrap the DAG afer statesync
  • Loading branch information
ibalajiarun authored Sep 15, 2023
1 parent 01ea6e3 commit fcf58d0
Show file tree
Hide file tree
Showing 13 changed files with 405 additions and 225 deletions.
6 changes: 3 additions & 3 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use futures_channel::mpsc::UnboundedSender;
use std::{collections::HashMap, sync::Arc};

#[async_trait]
pub trait Notifier: Send {
pub trait Notifier: Send + Sync {
fn send_ordered_nodes(
&mut self,
&self,
ordered_nodes: Vec<Arc<CertifiedNode>>,
failed_author: Vec<(Round, Author)>,
) -> anyhow::Result<()>;
Expand Down Expand Up @@ -62,7 +62,7 @@ impl NotifierAdapter {
#[async_trait]
impl Notifier for NotifierAdapter {
fn send_ordered_nodes(
&mut self,
&self,
ordered_nodes: Vec<Arc<CertifiedNode>>,
failed_author: Vec<(Round, Author)>,
) -> anyhow::Result<()> {
Expand Down
318 changes: 229 additions & 89 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -1,133 +1,273 @@
// Copyright © Aptos Foundation

use super::{
adapter::Notifier,
anchor_election::RoundRobinAnchorElection,
dag_driver::DagDriver,
dag_fetcher::{DagFetcherService, FetchRequestHandler},
dag_handler::NetworkHandler,
dag_network::TDAGNetworkSender,
dag_state_sync::DAG_WINDOW,
dag_state_sync::{DagStateSynchronizer, StateSyncTrigger, DAG_WINDOW},
dag_store::Dag,
order_rule::OrderRule,
rb_handler::NodeBroadcastHandler,
storage::DAGStorage,
types::DAGMessage,
types::{CertifiedNodeMessage, DAGMessage},
};
use crate::{
dag::adapter::NotifierAdapter, experimental::buffer_manager::OrderedBlocks,
network::IncomingDAGRequest, state_replication::PayloadClient,
dag::{adapter::NotifierAdapter, dag_fetcher::DagFetcher},
experimental::buffer_manager::OrderedBlocks,
network::IncomingDAGRequest,
state_replication::{PayloadClient, StateComputer},
};
use aptos_channels::{
aptos_channel::{self, Receiver},
message_queues::QueueStyle,
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_consensus_types::common::Author;
use aptos_crypto::HashValue;
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{
epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner,
aggregate_signature::AggregateSignature,
block_info::BlockInfo,
epoch_state::EpochState,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
validator_signer::ValidatorSigner,
};
use futures_channel::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
use futures::stream::{AbortHandle, Abortable};
use std::{sync::Arc, time::Duration};
use tokio::{select, task::JoinHandle};
use tokio_retry::strategy::ExponentialBackoff;

pub fn bootstrap_dag(
struct DagBootstrapper {
self_peer: Author,
signer: ValidatorSigner,
signer: Arc<ValidatorSigner>,
epoch_state: Arc<EpochState>,
latest_ledger_info: LedgerInfo,
storage: Arc<dyn DAGStorage>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
dag_network_sender: Arc<dyn TDAGNetworkSender>,
time_service: aptos_time_service::TimeService,
payload_client: Arc<dyn PayloadClient>,
) -> (
AbortHandle,
AbortHandle,
aptos_channel::Sender<Author, IncomingDAGRequest>,
futures_channel::mpsc::UnboundedReceiver<OrderedBlocks>,
) {
let validators = epoch_state.verifier.get_ordered_account_addresses();
let current_round = latest_ledger_info.round();
state_computer: Arc<dyn StateComputer>,
}

let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let adapter = Box::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone()));
let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);
impl DagBootstrapper {
fn new(
self_peer: Author,
signer: Arc<ValidatorSigner>,
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
dag_network_sender: Arc<dyn TDAGNetworkSender>,
time_service: aptos_time_service::TimeService,
payload_client: Arc<dyn PayloadClient>,
state_computer: Arc<dyn StateComputer>,
) -> Self {
Self {
self_peer,
signer,
epoch_state,
storage,
rb_network_sender,
dag_network_sender,
time_service,
payload_client,
state_computer,
}
}

// A backoff policy that starts at 100ms and doubles each iteration.
let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50);
let rb = Arc::new(ReliableBroadcast::new(
validators.clone(),
rb_network_sender,
rb_backoff_policy,
time_service.clone(),
// TODO: add to config
Duration::from_millis(500),
));

let dag = Arc::new(RwLock::new(Dag::new(
epoch_state.clone(),
storage.clone(),
current_round,
DAG_WINDOW,
)));

let anchor_election = Box::new(RoundRobinAnchorElection::new(validators));
let order_rule = OrderRule::new(
epoch_state.clone(),
latest_ledger_info,
dag.clone(),
anchor_election,
adapter,
storage.clone(),
);
fn bootstrap_dag_store(
&self,
latest_ledger_info: LedgerInfo,
notifier: Arc<dyn Notifier>,
) -> (Arc<RwLock<Dag>>, OrderRule) {
let dag = Arc::new(RwLock::new(Dag::new(
self.epoch_state.clone(),
self.storage.clone(),
latest_ledger_info.round(),
DAG_WINDOW,
)));

let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) =
DagFetcherService::new(
epoch_state.clone(),
dag_network_sender,
let validators = self.epoch_state.verifier.get_ordered_account_addresses();
let anchor_election = Box::new(RoundRobinAnchorElection::new(validators));

let order_rule = OrderRule::new(
self.epoch_state.clone(),
latest_ledger_info,
dag.clone(),
time_service.clone(),
anchor_election,
notifier,
self.storage.clone(),
);

(dag, order_rule)
}

fn bootstrap_components(
&self,
dag: Arc<RwLock<Dag>>,
order_rule: OrderRule,
state_sync_trigger: StateSyncTrigger,
) -> (NetworkHandler, DagFetcherService) {
let validators = self.epoch_state.verifier.get_ordered_account_addresses();

// A backoff policy that starts at 100ms and doubles each iteration.
let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50);
let rb = Arc::new(ReliableBroadcast::new(
validators.clone(),
self.rb_network_sender.clone(),
rb_backoff_policy,
self.time_service.clone(),
// TODO: add to config
Duration::from_millis(500),
));

let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) =
DagFetcherService::new(
self.epoch_state.clone(),
self.dag_network_sender.clone(),
dag.clone(),
self.time_service.clone(),
);
let fetch_requester = Arc::new(fetch_requester);

let dag_driver = DagDriver::new(
self.self_peer,
self.epoch_state.clone(),
dag.clone(),
self.payload_client.clone(),
rb,
self.time_service.clone(),
self.storage.clone(),
order_rule,
fetch_requester.clone(),
);
let rb_handler = NodeBroadcastHandler::new(
dag.clone(),
self.signer.clone(),
self.epoch_state.clone(),
self.storage.clone(),
fetch_requester,
);
let fetch_handler = FetchRequestHandler::new(dag, self.epoch_state.clone());

let dag_handler = NetworkHandler::new(
self.epoch_state.clone(),
rb_handler,
dag_driver,
fetch_handler,
node_fetch_waiter,
certified_node_fetch_waiter,
state_sync_trigger,
);

(dag_handler, dag_fetcher)
}

async fn bootstrapper(
self,
mut dag_rpc_rx: Receiver<Author, IncomingDAGRequest>,
ordered_nodes_tx: UnboundedSender<OrderedBlocks>,
mut shutdown_rx: oneshot::Receiver<()>,
) {
let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, self.storage.clone()));

let sync_manager = DagStateSynchronizer::new(
self.epoch_state.clone(),
adapter.clone(),
self.time_service.clone(),
self.state_computer.clone(),
self.storage.clone(),
);

// TODO: fetch the correct block info
let ledger_info = LedgerInfoWithSignatures::new(
LedgerInfo::new(BlockInfo::empty(), HashValue::zero()),
AggregateSignature::empty(),
);
let fetch_requester = Arc::new(fetch_requester);

let dag_driver = DagDriver::new(
loop {
let (dag_store, order_rule) =
self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone());

let state_sync_trigger = StateSyncTrigger::new(dag_store.clone(), adapter.clone());

let (handler, fetch_service) =
self.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger);

let df_handle = tokio::spawn(fetch_service.start());

// poll the network handler while waiting for rebootstrap notification or shutdown notification
select! {
biased;
_ = &mut shutdown_rx => {
df_handle.abort();
let _ = df_handle.await;
return;
},
certified_node_msg = handler.run(&mut dag_rpc_rx) => {
df_handle.abort();
let _ = df_handle.await;

let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone());

if let Err(e) = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone()).await {
error!(error = ?e, "unable to sync");
}
}
}
}
}
}

pub(super) fn bootstrap_dag_for_test(
self_peer: Author,
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
latest_ledger_info: LedgerInfo,
storage: Arc<dyn DAGStorage>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
dag_network_sender: Arc<dyn TDAGNetworkSender>,
time_service: aptos_time_service::TimeService,
payload_client: Arc<dyn PayloadClient>,
state_computer: Arc<dyn StateComputer>,
) -> (
JoinHandle<CertifiedNodeMessage>,
JoinHandle<()>,
aptos_channel::Sender<Author, IncomingDAGRequest>,
UnboundedReceiver<OrderedBlocks>,
) {
let bootstraper = DagBootstrapper::new(
self_peer,
epoch_state.clone(),
dag.clone(),
payload_client,
rb,
time_service,
storage.clone(),
order_rule,
fetch_requester.clone(),
);
let rb_handler = NodeBroadcastHandler::new(
dag.clone(),
signer,
epoch_state.clone(),
signer.into(),
epoch_state,
storage.clone(),
fetch_requester,
rb_network_sender,
dag_network_sender,
time_service,
payload_client,
state_computer,
);
let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone());

let dag_handler = NetworkHandler::new(
epoch_state,
dag_rpc_rx,
rb_handler,
dag_driver,
fetch_handler,
node_fetch_waiter,
certified_node_fetch_waiter,
);
let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone()));
let (dag_rpc_tx, mut dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);

let (dag_store, order_rule) =
bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone());

let state_sync_trigger = StateSyncTrigger::new(dag_store.clone(), adapter.clone());

let (nh_abort_handle, nh_abort_registration) = AbortHandle::new_pair();
let (df_abort_handle, df_abort_registration) = AbortHandle::new_pair();
let (handler, fetch_service) =
bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger);

tokio::spawn(Abortable::new(dag_handler.start(), nh_abort_registration));
tokio::spawn(Abortable::new(dag_fetcher.start(), df_abort_registration));
let dh_handle = tokio::spawn(async move { handler.run(&mut dag_rpc_rx).await });
let df_handle = tokio::spawn(fetch_service.start());

(
nh_abort_handle,
df_abort_handle,
dag_rpc_tx,
ordered_nodes_rx,
)
(dh_handle, df_handle, dag_rpc_tx, ordered_nodes_rx)
}
1 change: 0 additions & 1 deletion consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ impl DagDriver {
.broadcast(node.clone(), signature_builder)
.then(move |certificate| {
let certified_node = CertifiedNode::new(node, certificate.signatures().to_owned());

let certified_node_msg =
CertifiedNodeMessage::new(certified_node, latest_ledger_info);
rb.broadcast(certified_node_msg, cert_ack_set)
Expand Down
Loading

0 comments on commit fcf58d0

Please sign in to comment.