From 023b879d5c0f683be8882bfc9c3b2b04a67594d7 Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Tue, 29 Aug 2023 22:02:19 -0700 Subject: [PATCH] [dag] preliminary state sync logic --- consensus/src/dag/dag_state_sync.rs | 180 ++++++++++++++++++++++++++++ consensus/src/dag/dag_store.rs | 108 ++++++++++++++++- consensus/src/dag/mod.rs | 1 + consensus/src/dag/types.rs | 32 ++++- 4 files changed, 316 insertions(+), 5 deletions(-) create mode 100644 consensus/src/dag/dag_state_sync.rs diff --git a/consensus/src/dag/dag_state_sync.rs b/consensus/src/dag/dag_state_sync.rs new file mode 100644 index 0000000000000..ff70d06754465 --- /dev/null +++ b/consensus/src/dag/dag_state_sync.rs @@ -0,0 +1,180 @@ +use super::{ + dag_fetcher::{DagFetcher, TDagFetcher}, + dag_store::Dag, + storage::DAGStorage, + types::{CertifiedNodeWithLedgerInfo, RemoteFetchRequest}, + TDAGNetworkSender, +}; +use crate::state_replication::StateComputer; +use anyhow::anyhow; +use aptos_infallible::RwLock; +use aptos_logger::error; +use aptos_time_service::TimeService; +use aptos_types::{ + epoch_change::EpochChangeProof, epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, +}; +use itertools::Itertools; +use std::sync::Arc; + +pub const DAG_WINDOW: u64 = 10; + +pub(super) struct StateSyncManager { + epoch_state: Arc, + network: Arc, + time_service: TimeService, + state_computer: Arc, + storage: Arc, + dag_store: Arc>, +} + +impl StateSyncManager { + pub fn new( + epoch_state: Arc, + network: Arc, + time_service: TimeService, + state_computer: Arc, + storage: Arc, + dag_store: Arc>, + ) -> Self { + Self { + epoch_state, + network, + time_service, + state_computer, + storage, + dag_store, + } + } + + pub async fn sync_to( + &self, + node: &CertifiedNodeWithLedgerInfo, + ) -> anyhow::Result>>> { + self.sync_to_highest_commit_cert(node.ledger_info()).await; + self.try_sync_to_highest_ordered_anchor(node).await + } + + /// Fast forward in the decoupled-execution pipeline if the block exists there + pub async fn sync_to_highest_commit_cert(&self, ledger_info: &LedgerInfoWithSignatures) { + let dag_reader = self.dag_store.read(); + + // if the anchor exists between ledger info round and highest ordered round + // Note: ledger info round <= highest ordered round + if dag_reader.highest_committed_round().unwrap_or_default() + < ledger_info.commit_info().round() + && dag_reader.exists_by_round_digest( + ledger_info.commit_info().round(), + ledger_info.ledger_info().consensus_data_hash(), + ) + && dag_reader.highest_ordered_round().unwrap_or_default() + >= ledger_info.commit_info().round() + { + self.network.send_commit_proof(ledger_info.clone()).await + } + } + + /// Check if we're far away from this ledger info and need to sync. + /// This ensures that the block referred by the ledger info is not in buffer manager. + pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { + let dag_reader = self.dag_store.read(); + (dag_reader.highest_ordered_round().unwrap_or_default() < li.commit_info().round() + && !dag_reader.exists_by_round_digest( + li.commit_info().round(), + li.ledger_info().consensus_data_hash(), + )) + || dag_reader.highest_committed_round().unwrap_or_default() + 2 * DAG_WINDOW + < li.commit_info().round() + } + + pub async fn try_sync_to_highest_ordered_anchor( + &self, + node: &CertifiedNodeWithLedgerInfo, + ) -> anyhow::Result>>> { + // Check whether to actually sync + let commit_li = node.ledger_info(); + if !self.need_sync_for_ledger_info(commit_li) { + return Ok(None); + } + + let dag_fetcher = Arc::new(DagFetcher::new( + self.epoch_state.clone(), + self.network.clone(), + self.time_service.clone(), + )); + + self.sync_to_highest_ordered_anchor(node, dag_fetcher).await + } + + /// Note: Assumes that the sync checks have been done + pub async fn sync_to_highest_ordered_anchor( + &self, + node: &CertifiedNodeWithLedgerInfo, + dag_fetcher: Arc, + ) -> anyhow::Result>>> { + let commit_li = node.ledger_info(); + + // TODO: there is a case where DAG fetches missing nodes in window and a crash happens and when we restart, + // we end up with a gap between the DAG and we need to be smart enough to clean up the DAG before the gap. + + // Create a new DAG store and Fetch blocks + let target_round = node.round(); + let start_round = commit_li.commit_info().round().saturating_sub(DAG_WINDOW); + let sync_dag_store = Arc::new(RwLock::new(Dag::new( + self.epoch_state.clone(), + self.storage.clone(), + start_round, + ))); + let bitmask = { sync_dag_store.read().bitmask(target_round) }; + let request = RemoteFetchRequest::new( + self.epoch_state.epoch, + node.parents_metadata().cloned().collect_vec(), + bitmask, + ); + + let responders = node + .certificate() + .signatures() + .get_signers_addresses(&self.epoch_state.verifier.get_ordered_account_addresses()); + + match dag_fetcher + .fetch(request, responders, sync_dag_store.clone()) + .await + { + Ok(_) => {}, + Err(err) => { + error!("error fetching nodes {}", err); + return Err(err); + }, + } + + // State sync + self.state_computer.sync_to(commit_li.clone()).await?; + + { + let mut dag_writer = sync_dag_store.write(); + dag_writer.prune(); + if let Some(node_status) = dag_writer.get_node_ref_mut_by_round_digest( + commit_li.ledger_info().round(), + commit_li.ledger_info().consensus_data_hash(), + ) { + node_status.mark_as_committed(); + } else { + error!( + "node for commit ledger info does not exist in DAG: {}", + commit_li + ); + return Err(anyhow!("commit ledger info node not found")); + } + } + + if commit_li.ledger_info().ends_epoch() { + self.network + .send_epoch_change(EpochChangeProof::new( + vec![commit_li.clone()], + /* more = */ false, + )) + .await; + } + Ok(Some(sync_dag_store)) + } +} diff --git a/consensus/src/dag/dag_store.rs b/consensus/src/dag/dag_store.rs index 67fb60ba95228..dcefdd2a89ff6 100644 --- a/consensus/src/dag/dag_store.rs +++ b/consensus/src/dag/dag_store.rs @@ -36,6 +36,12 @@ impl NodeStatus { assert!(matches!(self, NodeStatus::Unordered(_))); *self = NodeStatus::Ordered(self.as_node().clone()); } + + pub fn mark_as_committed(&mut self) { + assert!(!matches!(self, NodeStatus::Committed(_))); + // TODO: try to avoid clone + *self = NodeStatus::Committed(self.as_node().clone()); + } } /// Data structure that stores the DAG representation, it maintains round based index. @@ -46,6 +52,7 @@ pub struct Dag { author_to_index: HashMap, storage: Arc, initial_round: Round, + epoch_state: Arc, } impl Dag { @@ -83,6 +90,7 @@ impl Dag { author_to_index, storage, initial_round, + epoch_state, } } @@ -112,8 +120,10 @@ impl Dag { let round = node.metadata().round(); ensure!(round >= self.lowest_round(), "round too low"); ensure!(round <= self.highest_round() + 1, "round too high"); - for parent in node.parents() { - ensure!(self.exists(parent.metadata()), "parent not exist"); + if round > self.lowest_round() { + for parent in node.parents() { + ensure!(self.exists(parent.metadata()), "parent not exist"); + } } let round_ref = self .nodes_by_round @@ -131,10 +141,21 @@ impl Dag { self.get_node_ref_by_metadata(metadata).is_some() } + pub fn exists_by_round_digest(&self, round: Round, digest: HashValue) -> bool { + self.get_node_by_round_digest(round, digest).is_some() + } + pub fn all_exists<'a>(&self, nodes: impl Iterator) -> bool { self.filter_missing(nodes).next().is_none() } + pub fn all_exists_by_round_author<'a>( + &self, + mut nodes: impl Iterator, + ) -> bool { + nodes.all(|(round, author)| self.get_node_ref(*round, author).is_some()) + } + pub fn filter_missing<'a, 'b>( &'b self, nodes: impl Iterator + 'b, @@ -142,22 +163,55 @@ impl Dag { nodes.filter(|node_metadata| !self.exists(node_metadata)) } + pub fn get_node_ref_by_round_digest( + &self, + round: Round, + digest: HashValue, + ) -> Option<&NodeStatus> { + self.get_round_iter(round)? + .find(|node_status| node_status.as_node().digest() == digest) + } + + pub fn get_node_ref_mut_by_round_digest( + &mut self, + round: Round, + digest: HashValue, + ) -> Option<&mut NodeStatus> { + self.get_round_iter_mut(round)? + .find(|node_status| node_status.as_node().digest() == digest) + } + fn get_node_ref_by_metadata(&self, metadata: &NodeMetadata) -> Option<&NodeStatus> { self.get_node_ref(metadata.round(), metadata.author()) } - fn get_node_ref(&self, round: Round, author: &Author) -> Option<&NodeStatus> { + pub fn get_node_ref(&self, round: Round, author: &Author) -> Option<&NodeStatus> { let index = self.author_to_index.get(author)?; let round_ref = self.nodes_by_round.get(&round)?; round_ref[*index].as_ref() } + pub fn get_node_ref_mut(&mut self, round: Round, author: &Author) -> Option<&mut NodeStatus> { + let index = self.author_to_index.get(author)?; + let round_ref = self.nodes_by_round.get_mut(&round)?; + round_ref[*index].as_mut() + } + fn get_round_iter(&self, round: Round) -> Option> { self.nodes_by_round .get(&round) .map(|round_ref| round_ref.iter().flatten()) } + fn get_round_iter_mut( + &mut self, + round: Round, + ) -> Option> { + self.nodes_by_round + .get_mut(&round) + .map(|round_ref| round_ref.iter_mut().flatten()) + } + pub fn get_node(&self, metadata: &NodeMetadata) -> Option> { self.get_node_ref_by_metadata(metadata) .map(|node_status| node_status.as_node().clone()) @@ -172,6 +226,15 @@ impl Dag { .map(|node_status| node_status.as_node()) } + pub fn get_node_by_round_digest( + &self, + round: Round, + digest: HashValue, + ) -> Option<&Arc> { + self.get_node_ref_by_round_digest(round, digest) + .map(|node_status| node_status.as_node()) + } + // TODO: I think we can cache votes in the NodeStatus::Unordered pub fn check_votes_for_node( &self, @@ -296,4 +359,43 @@ impl Dag { DagSnapshotBitmask::new(lowest_round, bitmask) } + + pub(super) fn prune(&mut self) { + let all_nodes = self.storage.get_certified_nodes().unwrap_or_default(); + let mut expired = vec![]; + for (digest, certified_node) in all_nodes { + if certified_node.metadata().epoch() != self.epoch_state.epoch + || certified_node.metadata().round() < self.initial_round + { + expired.push(digest); + self.nodes_by_round + .remove(&certified_node.metadata().round()); + } + } + if let Err(e) = self.storage.delete_certified_nodes(expired) { + error!("Error deleting expired nodes: {:?}", e); + } + } + + pub(super) fn highest_ordered_round(&self) -> Option { + for (round, round_nodes) in self.nodes_by_round.iter().rev() { + for maybe_node_status in round_nodes { + if matches!(maybe_node_status, Some(NodeStatus::Ordered(_))) { + return Some(*round); + } + } + } + None + } + + pub(super) fn highest_committed_round(&self) -> Option { + for (round, round_nodes) in self.nodes_by_round.iter() { + for maybe_node_status in round_nodes { + if matches!(maybe_node_status, Some(NodeStatus::Committed(_))) { + return Some(*round); + } + } + } + None + } } diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs index 5b2c488ca84a4..91985936e74ff 100644 --- a/consensus/src/dag/mod.rs +++ b/consensus/src/dag/mod.rs @@ -16,6 +16,7 @@ mod storage; #[cfg(test)] mod tests; mod types; +mod dag_state_sync; pub use dag_network::{RpcHandler, RpcWithFallback, TDAGNetworkSender}; pub use types::{CertifiedNode, DAGMessage, DAGNetworkMessage, Extensions, Node, NodeId, Vote}; diff --git a/consensus/src/dag/types.rs b/consensus/src/dag/types.rs index 405acff149701..8e7e1ea6cf183 100644 --- a/consensus/src/dag/types.rs +++ b/consensus/src/dag/types.rs @@ -16,6 +16,7 @@ use aptos_reliable_broadcast::{BroadcastStatus, RBMessage}; use aptos_types::{ aggregate_signature::{AggregateSignature, PartialSignatures}, epoch_state::EpochState, + ledger_info::LedgerInfoWithSignatures, validator_signer::ValidatorSigner, validator_verifier::ValidatorVerifier, }; @@ -418,6 +419,33 @@ impl TDAGMessage for CertifiedNode { } } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct CertifiedNodeWithLedgerInfo { + inner: CertifiedNode, + ledger_info: LedgerInfoWithSignatures, +} + +impl CertifiedNodeWithLedgerInfo { + pub fn new(node: CertifiedNode, ledger_info: LedgerInfoWithSignatures) -> Self { + Self { + inner: node, + ledger_info, + } + } + + pub fn ledger_info(&self) -> &LedgerInfoWithSignatures { + &self.ledger_info + } +} + +impl Deref for CertifiedNodeWithLedgerInfo { + type Target = CertifiedNode; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct Vote { metadata: NodeMetadata, @@ -538,10 +566,10 @@ pub struct RemoteFetchRequest { } impl RemoteFetchRequest { - pub fn new(epoch: u64, parents: Vec, exists_bitmask: DagSnapshotBitmask) -> Self { + pub fn new(epoch: u64, targets: Vec, exists_bitmask: DagSnapshotBitmask) -> Self { Self { epoch, - targets: parents, + targets, exists_bitmask, } }