Skip to content

Commit

Permalink
[dag] wip state sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 24, 2023
1 parent 2bcdbd4 commit d4409ca
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 74 deletions.
4 changes: 2 additions & 2 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::{
anchor_election::RoundRobinAnchorElection,
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, FetchRequestHandler},
dag_fetcher::{DagFetcherService, FetchRequestHandler},
dag_handler::NetworkHandler,
dag_network::TDAGNetworkSender,
dag_store::Dag,
Expand Down Expand Up @@ -76,7 +76,7 @@ pub fn bootstrap_dag(
);

let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) =
DagFetcher::new(
DagFetcherService::new(
epoch_state.clone(),
dag_network_sender,
dag.clone(),
Expand Down
177 changes: 111 additions & 66 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::dag::{
dag_store::Dag,
types::{CertifiedNode, FetchResponse, Node, RemoteFetchRequest},
};
use anyhow::ensure;
use aptos_consensus_types::common::Author;
use anyhow::{anyhow, ensure};
use aptos_consensus_types::common::{Author, Round};
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_time_service::TimeService;
Expand Down Expand Up @@ -124,15 +124,13 @@ impl LocalFetchRequest {
}
}

pub struct DagFetcher {
epoch_state: Arc<EpochState>,
network: Arc<dyn TDAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
pub struct DagFetcherService {
inner: DagFetcher,
ordered_authors: Vec<Author>,
request_rx: Receiver<LocalFetchRequest>,
time_service: TimeService,
}

impl DagFetcher {
impl DagFetcherService {
pub fn new(
epoch_state: Arc<EpochState>,
network: Arc<dyn TDAGNetworkSender>,
Expand All @@ -147,13 +145,12 @@ impl DagFetcher {
let (request_tx, request_rx) = tokio::sync::mpsc::channel(16);
let (node_tx, node_rx) = tokio::sync::mpsc::channel(100);
let (certified_node_tx, certified_node_rx) = tokio::sync::mpsc::channel(100);
let ordered_authors = epoch_state.verifier.get_ordered_account_addresses();
(
Self {
epoch_state,
network,
dag,
inner: DagFetcher::new(epoch_state, network, dag, time_service),
request_rx,
time_service,
ordered_authors,
},
FetchRequester {
request_tx,
Expand All @@ -167,68 +164,116 @@ impl DagFetcher {

pub async fn start(mut self) {
while let Some(local_request) = self.request_rx.recv().await {
let responders = local_request
.responders(&self.epoch_state.verifier.get_ordered_account_addresses());
let remote_request = {
let dag_reader = self.dag.read();

let missing_parents: Vec<NodeMetadata> = dag_reader
.filter_missing(local_request.node().parents_metadata())
.cloned()
.collect();

if missing_parents.is_empty() {
local_request.notify();
continue;
}

let target = local_request.node();
RemoteFetchRequest::new(
target.metadata().epoch(),
missing_parents,
dag_reader.bitmask(local_request.node().round()),
match self
.fetch_for_node(
local_request.node(),
local_request.responders(&self.ordered_authors),
)
};

let mut rpc = RpcWithFallback::new(
responders,
remote_request.clone().into(),
Duration::from_millis(500),
Duration::from_secs(1),
self.network.clone(),
self.time_service.clone(),
);
while let Some(response) = rpc.next().await {
if let Ok(response) =
response
.and_then(FetchResponse::try_from)
.and_then(|response| {
response.verify(&remote_request, &self.epoch_state.verifier)
})
.await
{
Ok(_) => local_request.notify(),
Err(err) => error!("unable to complete fetch successfully: {}", err),
}
}
}

pub(super) async fn fetch_for_node(
&mut self,
node: &Node,
responders: Vec<Author>,
) -> anyhow::Result<()> {
let remote_request = {
let dag_reader = self.inner.dag.read();

let missing_parents: Vec<NodeMetadata> = dag_reader
.filter_missing(node.parents_metadata())
.cloned()
.collect();

if missing_parents.is_empty() {
return Ok(());
}

RemoteFetchRequest::new(
node.metadata().epoch(),
missing_parents,
dag_reader.bitmask(node.round()),
)
};
let target_metadata = node
.parents_metadata()
.map(|metadata| (metadata.round(), *metadata.author()))
.collect();
self.inner
.fetch(remote_request, responders, target_metadata)
.await
}
}

pub(crate) struct DagFetcher {
network: Arc<dyn TDAGNetworkSender>,
time_service: TimeService,
epoch_state: Arc<EpochState>,
dag: Arc<RwLock<Dag>>,
}

impl DagFetcher {
pub(crate) fn new(
epoch_state: Arc<EpochState>,
network: Arc<dyn TDAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
time_service: TimeService,
) -> Self {
Self {
network,
time_service,
epoch_state,
dag,
}
}

pub(crate) async fn fetch<'a>(
&self,
remote_request: RemoteFetchRequest,
responders: Vec<Author>,
target_metadata: Vec<(Round, Author)>,
) -> anyhow::Result<()> {
let mut rpc = RpcWithFallback::new(
responders,
remote_request.clone().into(),
Duration::from_millis(500),
Duration::from_secs(1),
self.network.clone(),
self.time_service.clone(),
);

// TODO retry
while let Some(response) = rpc.next().await {
if let Ok(response) = response
.and_then(FetchResponse::try_from)
.and_then(|response| response.verify(&remote_request, &self.epoch_state.verifier))
{
let certified_nodes = response.certified_nodes();
// TODO: support chunk response or fallback to state sync
{
let certified_nodes = response.certified_nodes();
// TODO: support chunk response or fallback to state sync
{
let mut dag_writer = self.dag.write();
for node in certified_nodes {
if let Err(e) = dag_writer.add_node(node) {
error!("Failed to add node {}", e);
}
let mut dag_writer = self.dag.write();
for node in certified_nodes {
if let Err(e) = dag_writer.add_node(node) {
error!("Failed to add node {}", e);
}
}
}

if self
.dag
.read()
.all_exists(local_request.node().parents_metadata())
{
local_request.notify();
break;
}
if self
.dag
.read()
.all_exists_by_round_author(target_metadata.iter())
{
return Ok(());
}
}
// TODO retry
}
Err(anyhow!("fetch failed"))
}
}

Expand Down
4 changes: 4 additions & 0 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ impl Dag {
self.filter_missing(nodes).next().is_none()
}

pub fn all_exists_by_round_author<'a>(&self, mut nodes: impl Iterator<Item = &'a (Round, Author)>) -> bool {
nodes.all(|(round, author)| self.get_node_ref(*round, author).is_some())
}

pub fn filter_missing<'a, 'b>(
&'b self,
nodes: impl Iterator<Item = &'a NodeMetadata> + 'b,
Expand Down
1 change: 1 addition & 0 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod storage;
#[cfg(test)]
mod tests;
mod types;
mod state_sync;

pub use dag_network::{RpcHandler, RpcWithFallback, TDAGNetworkSender};
pub use types::{CertifiedNode, DAGMessage, DAGNetworkMessage, Extensions, Node, NodeId, Vote};
54 changes: 54 additions & 0 deletions consensus/src/dag/state_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use super::{dag_fetcher::DagFetcher, dag_store::Dag, storage::DAGStorage, TDAGNetworkSender, types::{RemoteFetchRequest, NodeMetadata}, NodeId};
use crate::state_replication::StateComputer;
use aptos_infallible::RwLock;
use aptos_time_service::TimeService;
use aptos_types::{epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures};
use std::sync::Arc;

pub const DAG_WINDOW: u64 = 10;

struct StateSyncAdapter {
epoch_state: Arc<EpochState>,
network: Arc<dyn TDAGNetworkSender>,
time_service: TimeService,
state_computer: Arc<dyn StateComputer>,
storage: Arc<dyn DAGStorage>,
}

impl StateSyncAdapter {
async fn sync_to_ledger_info(&self, target: LedgerInfoWithSignatures) -> anyhow::Result<()> {
// Check whether to actually sync

// TODO: there is a case where DAG fetches missing nodes in window, and a crash happens and when we restart,
// we end up with a gap between the DAG and we need to be smart enough to clean up the DAG before the gap.

// Create a new DAG store and Fetch blocks
let target_round = target.ledger_info().round();
let initial_round = target_round.saturating_sub(DAG_WINDOW);
let dag = Arc::new(RwLock::new(Dag::new(
self.epoch_state.clone(),
self.storage.clone(),
initial_round,
)));
let fetcher = DagFetcher::new(
self.epoch_state.clone(),
self.network.clone(),
dag,
self.time_service.clone(),
);

let target =

let request = RemoteFetchRequest::new(
self.epoch_state.epoch,
targets,
dag.read().bitmask(target_round),
);
fetcher.fetch(request, responders, target);

// State sync
self.state_computer.sync_to(target).await?;

Ok(())
}
}
4 changes: 2 additions & 2 deletions consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
dag::{
anchor_election::RoundRobinAnchorElection,
dag_driver::{DagDriver, DagDriverError},
dag_fetcher::DagFetcher,
dag_fetcher::DagFetcherService,
dag_network::{RpcWithFallback, TDAGNetworkSender},
dag_store::Dag,
order_rule::OrderRule,
Expand Down Expand Up @@ -100,7 +100,7 @@ fn test_certified_node_handler() {
storage.clone(),
);

let (_, fetch_requester, _, _) = DagFetcher::new(
let (_, fetch_requester, _, _) = DagFetcherService::new(
epoch_state.clone(),
network_sender,
dag.clone(),
Expand Down
14 changes: 10 additions & 4 deletions consensus/src/dag/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,21 +527,27 @@ where
}
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum RemoteFetchRequestTargets {
ByMetadata(Vec<NodeMetadata>),
ByHash(Vec<HashValue>),
}

/// Represents a request to fetch missing dependencies for `target`, `start_round` represents
/// the first round we care about in the DAG, `exists_bitmask` is a two dimensional bitmask represents
/// if a node exist at [start_round + index][validator_index].
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct RemoteFetchRequest {
epoch: u64,
targets: Vec<NodeMetadata>,
targets: RemoteFetchRequestTargets,
exists_bitmask: DagSnapshotBitmask,
}

impl RemoteFetchRequest {
pub fn new(epoch: u64, parents: Vec<NodeMetadata>, exists_bitmask: DagSnapshotBitmask) -> Self {
pub fn new(epoch: u64, targets: RemoteFetchRequestTargets, exists_bitmask: DagSnapshotBitmask) -> Self {
Self {
epoch,
targets: parents,
targets,
exists_bitmask,
}
}
Expand All @@ -550,7 +556,7 @@ impl RemoteFetchRequest {
self.epoch
}

pub fn targets(&self) -> &[NodeMetadata] {
pub fn targets(&self) -> &RemoteFetchRequestTargets {
&self.targets
}

Expand Down

0 comments on commit d4409ca

Please sign in to comment.