Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dag] dag rebootstrap #9967

Merged
merged 6 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use futures_channel::mpsc::UnboundedSender;
use std::{collections::HashMap, sync::Arc};

#[async_trait]
pub trait Notifier: Send {
pub trait Notifier: Send + Sync {
fn send_ordered_nodes(
&mut self,
&self,
ordered_nodes: Vec<Arc<CertifiedNode>>,
failed_author: Vec<(Round, Author)>,
) -> anyhow::Result<()>;
Expand Down Expand Up @@ -62,7 +62,7 @@ impl NotifierAdapter {
#[async_trait]
impl Notifier for NotifierAdapter {
fn send_ordered_nodes(
&mut self,
&self,
ordered_nodes: Vec<Arc<CertifiedNode>>,
failed_author: Vec<(Round, Author)>,
) -> anyhow::Result<()> {
Expand Down
318 changes: 229 additions & 89 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -1,133 +1,273 @@
// Copyright © Aptos Foundation

use super::{
adapter::Notifier,
anchor_election::RoundRobinAnchorElection,
dag_driver::DagDriver,
dag_fetcher::{DagFetcherService, FetchRequestHandler},
dag_handler::NetworkHandler,
dag_network::TDAGNetworkSender,
dag_state_sync::DAG_WINDOW,
dag_state_sync::{DagStateSynchronizer, StateSyncTrigger, DAG_WINDOW},
dag_store::Dag,
order_rule::OrderRule,
rb_handler::NodeBroadcastHandler,
storage::DAGStorage,
types::DAGMessage,
types::{CertifiedNodeMessage, DAGMessage},
};
use crate::{
dag::adapter::NotifierAdapter, experimental::buffer_manager::OrderedBlocks,
network::IncomingDAGRequest, state_replication::PayloadClient,
dag::{adapter::NotifierAdapter, dag_fetcher::DagFetcher},
experimental::buffer_manager::OrderedBlocks,
network::IncomingDAGRequest,
state_replication::{PayloadClient, StateComputer},
};
use aptos_channels::{
aptos_channel::{self, Receiver},
message_queues::QueueStyle,
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_consensus_types::common::Author;
use aptos_crypto::HashValue;
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{
epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner,
aggregate_signature::AggregateSignature,
block_info::BlockInfo,
epoch_state::EpochState,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
validator_signer::ValidatorSigner,
};
use futures_channel::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
use futures::stream::{AbortHandle, Abortable};
use std::{sync::Arc, time::Duration};
use tokio::{select, task::JoinHandle};
use tokio_retry::strategy::ExponentialBackoff;

pub fn bootstrap_dag(
struct DagBootstrapper {
self_peer: Author,
signer: ValidatorSigner,
signer: Arc<ValidatorSigner>,
epoch_state: Arc<EpochState>,
latest_ledger_info: LedgerInfo,
storage: Arc<dyn DAGStorage>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
dag_network_sender: Arc<dyn TDAGNetworkSender>,
time_service: aptos_time_service::TimeService,
payload_client: Arc<dyn PayloadClient>,
) -> (
AbortHandle,
AbortHandle,
aptos_channel::Sender<Author, IncomingDAGRequest>,
futures_channel::mpsc::UnboundedReceiver<OrderedBlocks>,
) {
let validators = epoch_state.verifier.get_ordered_account_addresses();
let current_round = latest_ledger_info.round();
state_computer: Arc<dyn StateComputer>,
}

let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let adapter = Box::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone()));
let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);
impl DagBootstrapper {
fn new(
self_peer: Author,
signer: Arc<ValidatorSigner>,
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
dag_network_sender: Arc<dyn TDAGNetworkSender>,
time_service: aptos_time_service::TimeService,
payload_client: Arc<dyn PayloadClient>,
state_computer: Arc<dyn StateComputer>,
) -> Self {
Self {
self_peer,
signer,
epoch_state,
storage,
rb_network_sender,
dag_network_sender,
time_service,
payload_client,
state_computer,
}
}

// A backoff policy that starts at 100ms and doubles each iteration.
let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50);
let rb = Arc::new(ReliableBroadcast::new(
validators.clone(),
rb_network_sender,
rb_backoff_policy,
time_service.clone(),
// TODO: add to config
Duration::from_millis(500),
));

let dag = Arc::new(RwLock::new(Dag::new(
epoch_state.clone(),
storage.clone(),
current_round,
DAG_WINDOW,
)));

let anchor_election = Box::new(RoundRobinAnchorElection::new(validators));
let order_rule = OrderRule::new(
epoch_state.clone(),
latest_ledger_info,
dag.clone(),
anchor_election,
adapter,
storage.clone(),
);
fn bootstrap_dag_store(
&self,
latest_ledger_info: LedgerInfo,
notifier: Arc<dyn Notifier>,
) -> (Arc<RwLock<Dag>>, OrderRule) {
let dag = Arc::new(RwLock::new(Dag::new(
self.epoch_state.clone(),
self.storage.clone(),
latest_ledger_info.round(),
DAG_WINDOW,
)));

let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) =
DagFetcherService::new(
epoch_state.clone(),
dag_network_sender,
let validators = self.epoch_state.verifier.get_ordered_account_addresses();
let anchor_election = Box::new(RoundRobinAnchorElection::new(validators));

let order_rule = OrderRule::new(
self.epoch_state.clone(),
latest_ledger_info,
dag.clone(),
time_service.clone(),
anchor_election,
notifier,
self.storage.clone(),
);

(dag, order_rule)
}

fn bootstrap_components(
&self,
dag: Arc<RwLock<Dag>>,
order_rule: OrderRule,
state_sync_trigger: StateSyncTrigger,
) -> (NetworkHandler, DagFetcherService) {
let validators = self.epoch_state.verifier.get_ordered_account_addresses();

// A backoff policy that starts at 100ms and doubles each iteration.
let rb_backoff_policy = ExponentialBackoff::from_millis(2).factor(50);
let rb = Arc::new(ReliableBroadcast::new(
validators.clone(),
self.rb_network_sender.clone(),
rb_backoff_policy,
self.time_service.clone(),
// TODO: add to config
Duration::from_millis(500),
));

let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) =
DagFetcherService::new(
self.epoch_state.clone(),
self.dag_network_sender.clone(),
dag.clone(),
self.time_service.clone(),
);
let fetch_requester = Arc::new(fetch_requester);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why cannot you return arc from DagFetcherService::new?


let dag_driver = DagDriver::new(
self.self_peer,
self.epoch_state.clone(),
dag.clone(),
self.payload_client.clone(),
rb,
self.time_service.clone(),
self.storage.clone(),
order_rule,
fetch_requester.clone(),
);
let rb_handler = NodeBroadcastHandler::new(
dag.clone(),
self.signer.clone(),
self.epoch_state.clone(),
self.storage.clone(),
fetch_requester,
);
let fetch_handler = FetchRequestHandler::new(dag, self.epoch_state.clone());

let dag_handler = NetworkHandler::new(
self.epoch_state.clone(),
rb_handler,
dag_driver,
fetch_handler,
node_fetch_waiter,
certified_node_fetch_waiter,
state_sync_trigger,
);

(dag_handler, dag_fetcher)
}

async fn bootstrapper(
self,
mut dag_rpc_rx: Receiver<Author, IncomingDAGRequest>,
ordered_nodes_tx: UnboundedSender<OrderedBlocks>,
mut shutdown_rx: oneshot::Receiver<()>,
) {
let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, self.storage.clone()));

let sync_manager = DagStateSynchronizer::new(
self.epoch_state.clone(),
adapter.clone(),
self.time_service.clone(),
self.state_computer.clone(),
self.storage.clone(),
);

// TODO: fetch the correct block info
let ledger_info = LedgerInfoWithSignatures::new(
LedgerInfo::new(BlockInfo::empty(), HashValue::zero()),
AggregateSignature::empty(),
);
let fetch_requester = Arc::new(fetch_requester);

let dag_driver = DagDriver::new(
loop {
let (dag_store, order_rule) =
self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone());

let state_sync_trigger = StateSyncTrigger::new(dag_store.clone(), adapter.clone());

let (handler, fetch_service) =
self.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger);

let df_handle = tokio::spawn(fetch_service.start());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can just have a drop guard like this created and avoid the abort/await lines in both branches?


// poll the network handler while waiting for rebootstrap notification or shutdown notification
select! {
biased;
_ = &mut shutdown_rx => {
df_handle.abort();
let _ = df_handle.await;
return;
},
certified_node_msg = handler.run(&mut dag_rpc_rx) => {
df_handle.abort();
let _ = df_handle.await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably should have a guard for these services


let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone());

if let Err(e) = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone()).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought we're creating a new dag store instead of re-using the current one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we are. I pass the existing dag store to do some assertion checks on whether to actually state sync.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, that sounds weird, the check should be done in the check function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i am being paranoid. i check in the check function and i assert in the sync_to function.

error!(error = ?e, "unable to sync");
}
}
}
}
}
}

pub(super) fn bootstrap_dag_for_test(
self_peer: Author,
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
latest_ledger_info: LedgerInfo,
storage: Arc<dyn DAGStorage>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
dag_network_sender: Arc<dyn TDAGNetworkSender>,
time_service: aptos_time_service::TimeService,
payload_client: Arc<dyn PayloadClient>,
state_computer: Arc<dyn StateComputer>,
) -> (
JoinHandle<CertifiedNodeMessage>,
JoinHandle<()>,
aptos_channel::Sender<Author, IncomingDAGRequest>,
UnboundedReceiver<OrderedBlocks>,
) {
let bootstraper = DagBootstrapper::new(
self_peer,
epoch_state.clone(),
dag.clone(),
payload_client,
rb,
time_service,
storage.clone(),
order_rule,
fetch_requester.clone(),
);
let rb_handler = NodeBroadcastHandler::new(
dag.clone(),
signer,
epoch_state.clone(),
signer.into(),
epoch_state,
storage.clone(),
fetch_requester,
rb_network_sender,
dag_network_sender,
time_service,
payload_client,
state_computer,
);
let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone());

let dag_handler = NetworkHandler::new(
epoch_state,
dag_rpc_rx,
rb_handler,
dag_driver,
fetch_handler,
node_fetch_waiter,
certified_node_fetch_waiter,
);
let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone()));
let (dag_rpc_tx, mut dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);

let (dag_store, order_rule) =
bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone());

let state_sync_trigger = StateSyncTrigger::new(dag_store.clone(), adapter.clone());

let (nh_abort_handle, nh_abort_registration) = AbortHandle::new_pair();
let (df_abort_handle, df_abort_registration) = AbortHandle::new_pair();
let (handler, fetch_service) =
bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger);

tokio::spawn(Abortable::new(dag_handler.start(), nh_abort_registration));
tokio::spawn(Abortable::new(dag_fetcher.start(), df_abort_registration));
let dh_handle = tokio::spawn(async move { handler.run(&mut dag_rpc_rx).await });
let df_handle = tokio::spawn(fetch_service.start());

(
nh_abort_handle,
df_abort_handle,
dag_rpc_tx,
ordered_nodes_rx,
)
(dh_handle, df_handle, dag_rpc_tx, ordered_nodes_rx)
}
1 change: 0 additions & 1 deletion consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ impl DagDriver {
.broadcast(node.clone(), signature_builder)
.then(move |certificate| {
let certified_node = CertifiedNode::new(node, certificate.signatures().to_owned());

let certified_node_msg =
CertifiedNodeMessage::new(certified_node, latest_ledger_info);
rb.broadcast(certified_node_msg, cert_ack_set)
Expand Down
Loading
Loading