diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 95dc77e98194b..8954ed5e452f8 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -3,7 +3,7 @@ use super::{ anchor_election::RoundRobinAnchorElection, dag_driver::DagDriver, - dag_fetcher::{DagFetcher, FetchRequestHandler}, + dag_fetcher::{DagFetcherService, FetchRequestHandler}, dag_handler::NetworkHandler, dag_network::TDAGNetworkSender, dag_store::Dag, @@ -76,7 +76,7 @@ pub fn bootstrap_dag( ); let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) = - DagFetcher::new( + DagFetcherService::new( epoch_state.clone(), dag_network_sender, dag.clone(), diff --git a/consensus/src/dag/dag_fetcher.rs b/consensus/src/dag/dag_fetcher.rs index 0ddddf30f8a6d..b5093deb0288a 100644 --- a/consensus/src/dag/dag_fetcher.rs +++ b/consensus/src/dag/dag_fetcher.rs @@ -7,8 +7,8 @@ use crate::dag::{ dag_store::Dag, types::{CertifiedNode, FetchResponse, Node, RemoteFetchRequest}, }; -use anyhow::ensure; -use aptos_consensus_types::common::Author; +use anyhow::{anyhow, ensure}; +use aptos_consensus_types::common::{Author, Round}; use aptos_infallible::RwLock; use aptos_logger::error; use aptos_time_service::TimeService; @@ -124,15 +124,13 @@ impl LocalFetchRequest { } } -pub struct DagFetcher { - epoch_state: Arc, - network: Arc, - dag: Arc>, +pub struct DagFetcherService { + inner: DagFetcher, + ordered_authors: Vec, request_rx: Receiver, - time_service: TimeService, } -impl DagFetcher { +impl DagFetcherService { pub fn new( epoch_state: Arc, network: Arc, @@ -147,13 +145,12 @@ impl DagFetcher { let (request_tx, request_rx) = tokio::sync::mpsc::channel(16); let (node_tx, node_rx) = tokio::sync::mpsc::channel(100); let (certified_node_tx, certified_node_rx) = tokio::sync::mpsc::channel(100); + let ordered_authors = epoch_state.verifier.get_ordered_account_addresses(); ( Self { - epoch_state, - network, - dag, + inner: DagFetcher::new(epoch_state, network, dag, time_service), request_rx, - time_service, + ordered_authors, }, FetchRequester { request_tx, @@ -167,68 +164,116 @@ impl DagFetcher { pub async fn start(mut self) { while let Some(local_request) = self.request_rx.recv().await { - let responders = local_request - .responders(&self.epoch_state.verifier.get_ordered_account_addresses()); - let remote_request = { - let dag_reader = self.dag.read(); - - let missing_parents: Vec = dag_reader - .filter_missing(local_request.node().parents_metadata()) - .cloned() - .collect(); - - if missing_parents.is_empty() { - local_request.notify(); - continue; - } - - let target = local_request.node(); - RemoteFetchRequest::new( - target.metadata().epoch(), - missing_parents, - dag_reader.bitmask(local_request.node().round()), + match self + .fetch_for_node( + local_request.node(), + local_request.responders(&self.ordered_authors), ) - }; - - let mut rpc = RpcWithFallback::new( - responders, - remote_request.clone().into(), - Duration::from_millis(500), - Duration::from_secs(1), - self.network.clone(), - self.time_service.clone(), - ); - while let Some(response) = rpc.next().await { - if let Ok(response) = - response - .and_then(FetchResponse::try_from) - .and_then(|response| { - response.verify(&remote_request, &self.epoch_state.verifier) - }) + .await + { + Ok(_) => local_request.notify(), + Err(err) => error!("unable to complete fetch successfully: {}", err), + } + } + } + + pub(super) async fn fetch_for_node( + &mut self, + node: &Node, + responders: Vec, + ) -> anyhow::Result<()> { + let remote_request = { + let dag_reader = self.inner.dag.read(); + + let missing_parents: Vec = dag_reader + .filter_missing(node.parents_metadata()) + .cloned() + .collect(); + + if missing_parents.is_empty() { + return Ok(()); + } + + RemoteFetchRequest::new( + node.metadata().epoch(), + missing_parents, + dag_reader.bitmask(node.round()), + ) + }; + let target_metadata = node + .parents_metadata() + .map(|metadata| (metadata.round(), *metadata.author())) + .collect(); + self.inner + .fetch(remote_request, responders, target_metadata) + .await + } +} + +pub(crate) struct DagFetcher { + network: Arc, + time_service: TimeService, + epoch_state: Arc, + dag: Arc>, +} + +impl DagFetcher { + pub(crate) fn new( + epoch_state: Arc, + network: Arc, + dag: Arc>, + time_service: TimeService, + ) -> Self { + Self { + network, + time_service, + epoch_state, + dag, + } + } + + pub(crate) async fn fetch<'a>( + &self, + remote_request: RemoteFetchRequest, + responders: Vec, + target_metadata: Vec<(Round, Author)>, + ) -> anyhow::Result<()> { + let mut rpc = RpcWithFallback::new( + responders, + remote_request.clone().into(), + Duration::from_millis(500), + Duration::from_secs(1), + self.network.clone(), + self.time_service.clone(), + ); + + // TODO retry + while let Some(response) = rpc.next().await { + if let Ok(response) = response + .and_then(FetchResponse::try_from) + .and_then(|response| response.verify(&remote_request, &self.epoch_state.verifier)) + { + let certified_nodes = response.certified_nodes(); + // TODO: support chunk response or fallback to state sync { - let certified_nodes = response.certified_nodes(); - // TODO: support chunk response or fallback to state sync - { - let mut dag_writer = self.dag.write(); - for node in certified_nodes { - if let Err(e) = dag_writer.add_node(node) { - error!("Failed to add node {}", e); - } + let mut dag_writer = self.dag.write(); + for node in certified_nodes { + if let Err(e) = dag_writer.add_node(node) { + error!("Failed to add node {}", e); } } + } - if self - .dag - .read() - .all_exists(local_request.node().parents_metadata()) - { - local_request.notify(); - break; - } + if self + .dag + .read() + .all_exists_by_round_author(target_metadata.iter()) + { + return Ok(()); } } - // TODO retry } + Err(anyhow!("fetch failed")) } } diff --git a/consensus/src/dag/dag_store.rs b/consensus/src/dag/dag_store.rs index a820ca56fa5c2..47ef6b40457d4 100644 --- a/consensus/src/dag/dag_store.rs +++ b/consensus/src/dag/dag_store.rs @@ -135,6 +135,10 @@ impl Dag { self.filter_missing(nodes).next().is_none() } + pub fn all_exists_by_round_author<'a>(&self, mut nodes: impl Iterator) -> bool { + nodes.all(|(round, author)| self.get_node_ref(*round, author).is_some()) + } + pub fn filter_missing<'a, 'b>( &'b self, nodes: impl Iterator + 'b, diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs index 5b2c488ca84a4..e208a3b9039e5 100644 --- a/consensus/src/dag/mod.rs +++ b/consensus/src/dag/mod.rs @@ -16,6 +16,7 @@ mod storage; #[cfg(test)] mod tests; mod types; +mod state_sync; pub use dag_network::{RpcHandler, RpcWithFallback, TDAGNetworkSender}; pub use types::{CertifiedNode, DAGMessage, DAGNetworkMessage, Extensions, Node, NodeId, Vote}; diff --git a/consensus/src/dag/state_sync.rs b/consensus/src/dag/state_sync.rs new file mode 100644 index 0000000000000..61d6b6df45d14 --- /dev/null +++ b/consensus/src/dag/state_sync.rs @@ -0,0 +1,54 @@ +use super::{dag_fetcher::DagFetcher, dag_store::Dag, storage::DAGStorage, TDAGNetworkSender, types::{RemoteFetchRequest, NodeMetadata}, NodeId}; +use crate::state_replication::StateComputer; +use aptos_infallible::RwLock; +use aptos_time_service::TimeService; +use aptos_types::{epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures}; +use std::sync::Arc; + +pub const DAG_WINDOW: u64 = 10; + +struct StateSyncAdapter { + epoch_state: Arc, + network: Arc, + time_service: TimeService, + state_computer: Arc, + storage: Arc, +} + +impl StateSyncAdapter { + async fn sync_to_ledger_info(&self, target: LedgerInfoWithSignatures) -> anyhow::Result<()> { + // Check whether to actually sync + + // TODO: there is a case where DAG fetches missing nodes in window, and a crash happens and when we restart, + // we end up with a gap between the DAG and we need to be smart enough to clean up the DAG before the gap. + + // Create a new DAG store and Fetch blocks + let target_round = target.ledger_info().round(); + let initial_round = target_round.saturating_sub(DAG_WINDOW); + let dag = Arc::new(RwLock::new(Dag::new( + self.epoch_state.clone(), + self.storage.clone(), + initial_round, + ))); + let fetcher = DagFetcher::new( + self.epoch_state.clone(), + self.network.clone(), + dag, + self.time_service.clone(), + ); + + let target = + + let request = RemoteFetchRequest::new( + self.epoch_state.epoch, + targets, + dag.read().bitmask(target_round), + ); + fetcher.fetch(request, responders, target); + + // State sync + self.state_computer.sync_to(target).await?; + + Ok(()) + } +} diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index 940dfc5c9fc20..f38516ab1b8ca 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -4,7 +4,7 @@ use crate::{ dag::{ anchor_election::RoundRobinAnchorElection, dag_driver::{DagDriver, DagDriverError}, - dag_fetcher::DagFetcher, + dag_fetcher::DagFetcherService, dag_network::{RpcWithFallback, TDAGNetworkSender}, dag_store::Dag, order_rule::OrderRule, @@ -100,7 +100,7 @@ fn test_certified_node_handler() { storage.clone(), ); - let (_, fetch_requester, _, _) = DagFetcher::new( + let (_, fetch_requester, _, _) = DagFetcherService::new( epoch_state.clone(), network_sender, dag.clone(), diff --git a/consensus/src/dag/types.rs b/consensus/src/dag/types.rs index 405acff149701..65d0b81983731 100644 --- a/consensus/src/dag/types.rs +++ b/consensus/src/dag/types.rs @@ -527,21 +527,27 @@ where } } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub enum RemoteFetchRequestTargets { + ByMetadata(Vec), + ByHash(Vec), +} + /// Represents a request to fetch missing dependencies for `target`, `start_round` represents /// the first round we care about in the DAG, `exists_bitmask` is a two dimensional bitmask represents /// if a node exist at [start_round + index][validator_index]. #[derive(Serialize, Deserialize, Clone, Debug)] pub struct RemoteFetchRequest { epoch: u64, - targets: Vec, + targets: RemoteFetchRequestTargets, exists_bitmask: DagSnapshotBitmask, } impl RemoteFetchRequest { - pub fn new(epoch: u64, parents: Vec, exists_bitmask: DagSnapshotBitmask) -> Self { + pub fn new(epoch: u64, targets: RemoteFetchRequestTargets, exists_bitmask: DagSnapshotBitmask) -> Self { Self { epoch, - targets: parents, + targets, exists_bitmask, } } @@ -550,7 +556,7 @@ impl RemoteFetchRequest { self.epoch } - pub fn targets(&self) -> &[NodeMetadata] { + pub fn targets(&self) -> &RemoteFetchRequestTargets { &self.targets }