Skip to content

Commit

Permalink
[dag] preliminary state sync logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 30, 2023
1 parent dc5f1e6 commit 023b879
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 5 deletions.
180 changes: 180 additions & 0 deletions consensus/src/dag/dag_state_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
use super::{
dag_fetcher::{DagFetcher, TDagFetcher},
dag_store::Dag,
storage::DAGStorage,
types::{CertifiedNodeWithLedgerInfo, RemoteFetchRequest},
TDAGNetworkSender,
};
use crate::state_replication::StateComputer;
use anyhow::anyhow;
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_time_service::TimeService;
use aptos_types::{
epoch_change::EpochChangeProof, epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures,
};
use itertools::Itertools;
use std::sync::Arc;

pub const DAG_WINDOW: u64 = 10;

pub(super) struct StateSyncManager {
epoch_state: Arc<EpochState>,
network: Arc<dyn TDAGNetworkSender>,
time_service: TimeService,
state_computer: Arc<dyn StateComputer>,
storage: Arc<dyn DAGStorage>,
dag_store: Arc<RwLock<Dag>>,
}

impl StateSyncManager {
pub fn new(
epoch_state: Arc<EpochState>,
network: Arc<dyn TDAGNetworkSender>,
time_service: TimeService,
state_computer: Arc<dyn StateComputer>,
storage: Arc<dyn DAGStorage>,
dag_store: Arc<RwLock<Dag>>,
) -> Self {
Self {
epoch_state,
network,
time_service,
state_computer,
storage,
dag_store,
}
}

pub async fn sync_to(
&self,
node: &CertifiedNodeWithLedgerInfo,
) -> anyhow::Result<Option<Arc<RwLock<Dag>>>> {
self.sync_to_highest_commit_cert(node.ledger_info()).await;
self.try_sync_to_highest_ordered_anchor(node).await
}

/// Fast forward in the decoupled-execution pipeline if the block exists there
pub async fn sync_to_highest_commit_cert(&self, ledger_info: &LedgerInfoWithSignatures) {
let dag_reader = self.dag_store.read();

// if the anchor exists between ledger info round and highest ordered round
// Note: ledger info round <= highest ordered round
if dag_reader.highest_committed_round().unwrap_or_default()
< ledger_info.commit_info().round()
&& dag_reader.exists_by_round_digest(
ledger_info.commit_info().round(),
ledger_info.ledger_info().consensus_data_hash(),
)
&& dag_reader.highest_ordered_round().unwrap_or_default()
>= ledger_info.commit_info().round()
{
self.network.send_commit_proof(ledger_info.clone()).await
}
}

/// Check if we're far away from this ledger info and need to sync.
/// This ensures that the block referred by the ledger info is not in buffer manager.
pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool {
let dag_reader = self.dag_store.read();
(dag_reader.highest_ordered_round().unwrap_or_default() < li.commit_info().round()
&& !dag_reader.exists_by_round_digest(
li.commit_info().round(),
li.ledger_info().consensus_data_hash(),
))
|| dag_reader.highest_committed_round().unwrap_or_default() + 2 * DAG_WINDOW
< li.commit_info().round()
}

pub async fn try_sync_to_highest_ordered_anchor(
&self,
node: &CertifiedNodeWithLedgerInfo,
) -> anyhow::Result<Option<Arc<RwLock<Dag>>>> {
// Check whether to actually sync
let commit_li = node.ledger_info();
if !self.need_sync_for_ledger_info(commit_li) {
return Ok(None);
}

let dag_fetcher = Arc::new(DagFetcher::new(
self.epoch_state.clone(),
self.network.clone(),
self.time_service.clone(),
));

self.sync_to_highest_ordered_anchor(node, dag_fetcher).await
}

/// Note: Assumes that the sync checks have been done
pub async fn sync_to_highest_ordered_anchor(
&self,
node: &CertifiedNodeWithLedgerInfo,
dag_fetcher: Arc<impl TDagFetcher>,
) -> anyhow::Result<Option<Arc<RwLock<Dag>>>> {
let commit_li = node.ledger_info();

// TODO: there is a case where DAG fetches missing nodes in window and a crash happens and when we restart,
// we end up with a gap between the DAG and we need to be smart enough to clean up the DAG before the gap.

// Create a new DAG store and Fetch blocks
let target_round = node.round();
let start_round = commit_li.commit_info().round().saturating_sub(DAG_WINDOW);
let sync_dag_store = Arc::new(RwLock::new(Dag::new(
self.epoch_state.clone(),
self.storage.clone(),
start_round,
)));
let bitmask = { sync_dag_store.read().bitmask(target_round) };
let request = RemoteFetchRequest::new(
self.epoch_state.epoch,
node.parents_metadata().cloned().collect_vec(),
bitmask,
);

let responders = node
.certificate()
.signatures()
.get_signers_addresses(&self.epoch_state.verifier.get_ordered_account_addresses());

match dag_fetcher
.fetch(request, responders, sync_dag_store.clone())
.await
{
Ok(_) => {},
Err(err) => {
error!("error fetching nodes {}", err);
return Err(err);
},
}

// State sync
self.state_computer.sync_to(commit_li.clone()).await?;

{
let mut dag_writer = sync_dag_store.write();
dag_writer.prune();
if let Some(node_status) = dag_writer.get_node_ref_mut_by_round_digest(
commit_li.ledger_info().round(),
commit_li.ledger_info().consensus_data_hash(),
) {
node_status.mark_as_committed();
} else {
error!(
"node for commit ledger info does not exist in DAG: {}",
commit_li
);
return Err(anyhow!("commit ledger info node not found"));
}
}

if commit_li.ledger_info().ends_epoch() {
self.network
.send_epoch_change(EpochChangeProof::new(
vec![commit_li.clone()],
/* more = */ false,
))
.await;
}
Ok(Some(sync_dag_store))
}
}
108 changes: 105 additions & 3 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ impl NodeStatus {
assert!(matches!(self, NodeStatus::Unordered(_)));
*self = NodeStatus::Ordered(self.as_node().clone());
}

pub fn mark_as_committed(&mut self) {
assert!(!matches!(self, NodeStatus::Committed(_)));
// TODO: try to avoid clone
*self = NodeStatus::Committed(self.as_node().clone());
}
}

/// Data structure that stores the DAG representation, it maintains round based index.
Expand All @@ -46,6 +52,7 @@ pub struct Dag {
author_to_index: HashMap<Author, usize>,
storage: Arc<dyn DAGStorage>,
initial_round: Round,
epoch_state: Arc<EpochState>,
}

impl Dag {
Expand Down Expand Up @@ -83,6 +90,7 @@ impl Dag {
author_to_index,
storage,
initial_round,
epoch_state,
}
}

Expand Down Expand Up @@ -112,8 +120,10 @@ impl Dag {
let round = node.metadata().round();
ensure!(round >= self.lowest_round(), "round too low");
ensure!(round <= self.highest_round() + 1, "round too high");
for parent in node.parents() {
ensure!(self.exists(parent.metadata()), "parent not exist");
if round > self.lowest_round() {
for parent in node.parents() {
ensure!(self.exists(parent.metadata()), "parent not exist");
}
}
let round_ref = self
.nodes_by_round
Expand All @@ -131,33 +141,77 @@ impl Dag {
self.get_node_ref_by_metadata(metadata).is_some()
}

pub fn exists_by_round_digest(&self, round: Round, digest: HashValue) -> bool {
self.get_node_by_round_digest(round, digest).is_some()
}

pub fn all_exists<'a>(&self, nodes: impl Iterator<Item = &'a NodeMetadata>) -> bool {
self.filter_missing(nodes).next().is_none()
}

pub fn all_exists_by_round_author<'a>(
&self,
mut nodes: impl Iterator<Item = &'a (Round, Author)>,
) -> bool {
nodes.all(|(round, author)| self.get_node_ref(*round, author).is_some())
}

pub fn filter_missing<'a, 'b>(
&'b self,
nodes: impl Iterator<Item = &'a NodeMetadata> + 'b,
) -> impl Iterator<Item = &'a NodeMetadata> + 'b {
nodes.filter(|node_metadata| !self.exists(node_metadata))
}

pub fn get_node_ref_by_round_digest(
&self,
round: Round,
digest: HashValue,
) -> Option<&NodeStatus> {
self.get_round_iter(round)?
.find(|node_status| node_status.as_node().digest() == digest)
}

pub fn get_node_ref_mut_by_round_digest(
&mut self,
round: Round,
digest: HashValue,
) -> Option<&mut NodeStatus> {
self.get_round_iter_mut(round)?
.find(|node_status| node_status.as_node().digest() == digest)
}

fn get_node_ref_by_metadata(&self, metadata: &NodeMetadata) -> Option<&NodeStatus> {
self.get_node_ref(metadata.round(), metadata.author())
}

fn get_node_ref(&self, round: Round, author: &Author) -> Option<&NodeStatus> {
pub fn get_node_ref(&self, round: Round, author: &Author) -> Option<&NodeStatus> {
let index = self.author_to_index.get(author)?;
let round_ref = self.nodes_by_round.get(&round)?;
round_ref[*index].as_ref()
}

pub fn get_node_ref_mut(&mut self, round: Round, author: &Author) -> Option<&mut NodeStatus> {
let index = self.author_to_index.get(author)?;
let round_ref = self.nodes_by_round.get_mut(&round)?;
round_ref[*index].as_mut()
}

fn get_round_iter(&self, round: Round) -> Option<impl Iterator<Item = &NodeStatus>> {
self.nodes_by_round
.get(&round)
.map(|round_ref| round_ref.iter().flatten())
}

fn get_round_iter_mut(
&mut self,
round: Round,
) -> Option<impl Iterator<Item = &mut NodeStatus>> {
self.nodes_by_round
.get_mut(&round)
.map(|round_ref| round_ref.iter_mut().flatten())
}

pub fn get_node(&self, metadata: &NodeMetadata) -> Option<Arc<CertifiedNode>> {
self.get_node_ref_by_metadata(metadata)
.map(|node_status| node_status.as_node().clone())
Expand All @@ -172,6 +226,15 @@ impl Dag {
.map(|node_status| node_status.as_node())
}

pub fn get_node_by_round_digest(
&self,
round: Round,
digest: HashValue,
) -> Option<&Arc<CertifiedNode>> {
self.get_node_ref_by_round_digest(round, digest)
.map(|node_status| node_status.as_node())
}

// TODO: I think we can cache votes in the NodeStatus::Unordered
pub fn check_votes_for_node(
&self,
Expand Down Expand Up @@ -296,4 +359,43 @@ impl Dag {

DagSnapshotBitmask::new(lowest_round, bitmask)
}

pub(super) fn prune(&mut self) {
let all_nodes = self.storage.get_certified_nodes().unwrap_or_default();
let mut expired = vec![];
for (digest, certified_node) in all_nodes {
if certified_node.metadata().epoch() != self.epoch_state.epoch
|| certified_node.metadata().round() < self.initial_round
{
expired.push(digest);
self.nodes_by_round
.remove(&certified_node.metadata().round());
}
}
if let Err(e) = self.storage.delete_certified_nodes(expired) {
error!("Error deleting expired nodes: {:?}", e);
}
}

pub(super) fn highest_ordered_round(&self) -> Option<Round> {
for (round, round_nodes) in self.nodes_by_round.iter().rev() {
for maybe_node_status in round_nodes {
if matches!(maybe_node_status, Some(NodeStatus::Ordered(_))) {
return Some(*round);
}
}
}
None
}

pub(super) fn highest_committed_round(&self) -> Option<Round> {
for (round, round_nodes) in self.nodes_by_round.iter() {
for maybe_node_status in round_nodes {
if matches!(maybe_node_status, Some(NodeStatus::Committed(_))) {
return Some(*round);
}
}
}
None
}
}
1 change: 1 addition & 0 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod storage;
#[cfg(test)]
mod tests;
mod types;
mod dag_state_sync;

pub use dag_network::{RpcHandler, RpcWithFallback, TDAGNetworkSender};
pub use types::{CertifiedNode, DAGMessage, DAGNetworkMessage, Extensions, Node, NodeId, Vote};
Loading

0 comments on commit 023b879

Please sign in to comment.