diff --git a/consensus/src/dag/tests/dag_state_sync_tests.rs b/consensus/src/dag/tests/dag_state_sync_tests.rs new file mode 100644 index 0000000000000..4644590ef78ce --- /dev/null +++ b/consensus/src/dag/tests/dag_state_sync_tests.rs @@ -0,0 +1,216 @@ +// Copyright © Aptos Foundation + +use crate::{ + dag::{ + adapter::Notifier, + dag_fetcher::{FetchRequestHandler, TDagFetcher}, + dag_state_sync::{StateSyncManager, DAG_WINDOW}, + dag_store::Dag, + storage::DAGStorage, + tests::{dag_test::MockStorage, helpers::generate_dag_nodes}, + types::{CertifiedNodeMessage, RemoteFetchRequest}, + CertifiedNode, DAGMessage, RpcHandler, RpcWithFallback, TDAGNetworkSender, + }, + test_utils::EmptyStateComputer, +}; +use aptos_consensus_types::common::{Author, Round}; +use aptos_crypto::HashValue; +use aptos_infallible::RwLock; +use aptos_reliable_broadcast::RBNetworkSender; +use aptos_time_service::TimeService; +use aptos_types::{ + aggregate_signature::AggregateSignature, + block_info::BlockInfo, + epoch_change::EpochChangeProof, + epoch_state::EpochState, + ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, + validator_verifier::random_validator_verifier, +}; +use async_trait::async_trait; +use claims::assert_none; +use std::{sync::Arc, time::Duration}; + +struct MockDAGNetworkSender {} + +#[async_trait] +impl RBNetworkSender for MockDAGNetworkSender { + async fn send_rb_rpc( + &self, + _receiver: Author, + _message: DAGMessage, + _timeout: Duration, + ) -> anyhow::Result { + unimplemented!() + } +} + +#[async_trait] +impl TDAGNetworkSender for MockDAGNetworkSender { + async fn send_rpc( + &self, + _receiver: Author, + _message: DAGMessage, + _timeout: Duration, + ) -> anyhow::Result { + unimplemented!() + } + + /// Given a list of potential responders, sending rpc to get response from any of them and could + /// fallback to more in case of failures. + async fn send_rpc_with_fallbacks( + &self, + _responders: Vec, + _message: DAGMessage, + _retry_interval: Duration, + _rpc_timeout: Duration, + ) -> RpcWithFallback { + unimplemented!() + } +} + +struct MockDagFetcher { + target_dag: Arc>, + epoch_state: Arc, +} + +#[async_trait] +impl TDagFetcher for MockDagFetcher { + async fn fetch( + &self, + remote_request: RemoteFetchRequest, + _responders: Vec, + new_dag: Arc>, + ) -> anyhow::Result<()> { + let response = FetchRequestHandler::new(self.target_dag.clone(), self.epoch_state.clone()) + .process(remote_request) + .unwrap(); + + let mut new_dag_writer = new_dag.write(); + + for node in response.certified_nodes().into_iter().rev() { + new_dag_writer.add_node(node).unwrap() + } + + Ok(()) + } +} + +struct MockNotifier {} + +#[async_trait] +impl Notifier for MockNotifier { + fn send_ordered_nodes( + &mut self, + _ordered_nodes: Vec>, + _failed_author: Vec<(Round, Author)>, + ) -> anyhow::Result<()> { + Ok(()) + } + + async fn send_epoch_change(&self, _proof: EpochChangeProof) {} + + async fn send_commit_proof(&self, _ledger_info: LedgerInfoWithSignatures) {} +} + +fn setup( + epoch_state: Arc, + dag_store: Arc>, + storage: Arc, +) -> StateSyncManager { + let network = Arc::new(MockDAGNetworkSender {}); + let time_service = TimeService::mock(); + let state_computer = Arc::new(EmptyStateComputer {}); + let upstream_notifier = Arc::new(MockNotifier {}); + + StateSyncManager::new( + epoch_state, + network, + upstream_notifier, + time_service, + state_computer, + storage, + dag_store, + ) +} + +#[tokio::test] +async fn test_dag_state_sync() { + const NUM_ROUNDS: usize = 90; + const LI_ROUNDS: usize = NUM_ROUNDS * 2 / 3; + const SLOW_DAG_ROUNDS: usize = NUM_ROUNDS / 3; + + let (signers, validator_verifier) = random_validator_verifier(4, None, false); + let validators = validator_verifier.get_ordered_account_addresses(); + let epoch_state = Arc::new(EpochState { + epoch: 1, + verifier: validator_verifier, + }); + let storage = Arc::new(MockStorage::new()); + + let virtual_dag = (0..NUM_ROUNDS) + .map(|_| { + signers + .iter() + .map(|_| Some(vec![true; signers.len() * 2 / 3 + 1])) + .collect() + }) + .collect::>(); + let nodes = generate_dag_nodes(&virtual_dag, &validators); + + let mut fast_dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1, 0); + for round_nodes in &nodes { + for node in round_nodes.iter().flatten() { + fast_dag.add_node(node.clone()).unwrap(); + } + } + let fast_dag = Arc::new(RwLock::new(fast_dag)); + + let mut slow_dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1, 0); + for round_nodes in nodes.iter().take(SLOW_DAG_ROUNDS) { + for node in round_nodes.iter().flatten() { + slow_dag.add_node(node.clone()).unwrap(); + } + } + let slow_dag = Arc::new(RwLock::new(slow_dag)); + + let li_node = nodes[LI_ROUNDS - 1].first().unwrap().clone().unwrap(); + let sync_to_li = LedgerInfoWithSignatures::new( + LedgerInfo::new( + BlockInfo::new( + epoch_state.epoch, + li_node.round(), + HashValue::zero(), + HashValue::zero(), + 0, + 0, + None, + ), + li_node.digest(), + ), + AggregateSignature::empty(), + ); + let sync_to_node = nodes[NUM_ROUNDS - 1].first().unwrap().clone().unwrap(); + + let sync_node_li = CertifiedNodeMessage::new(sync_to_node, sync_to_li); + + let state_sync = setup(epoch_state.clone(), slow_dag.clone(), storage.clone()); + let dag_fetcher = Arc::new(MockDagFetcher { + target_dag: fast_dag.clone(), + epoch_state: epoch_state.clone(), + }); + + let sync_result = state_sync + .sync_to_highest_ordered_anchor(&sync_node_li, dag_fetcher) + .await; + let new_dag = sync_result.unwrap().unwrap(); + + let dag_reader = new_dag.read(); + + assert_eq!(dag_reader.lowest_round(), (LI_ROUNDS - DAG_WINDOW) as Round); + assert_eq!(dag_reader.highest_round(), (NUM_ROUNDS - 1) as Round); + assert_none!(dag_reader.highest_ordered_anchor_round(),); + assert_eq!( + dag_reader.highest_committed_anchor_round(), + LI_ROUNDS as Round + ); +} diff --git a/consensus/src/dag/tests/helpers.rs b/consensus/src/dag/tests/helpers.rs index 84cad58ce6de1..b3939d6db4aa7 100644 --- a/consensus/src/dag/tests/helpers.rs +++ b/consensus/src/dag/tests/helpers.rs @@ -37,3 +37,43 @@ pub(crate) fn new_node( Extensions::empty(), ) } + +/// Generate certified nodes for dag given the virtual dag +pub(crate) fn generate_dag_nodes( + dag: &[Vec>>], + validators: &[Author], +) -> Vec>> { + let mut nodes = vec![]; + let mut previous_round: Vec> = vec![]; + for (round, round_nodes) in dag.iter().enumerate() { + let mut nodes_at_round = vec![]; + for (idx, author) in validators.iter().enumerate() { + if let Some(bitmask) = &round_nodes[idx] { + // the bitmask is compressed (without the holes), we need to flatten the previous round nodes + // to match the index + let parents: Vec<_> = previous_round + .iter() + .flatten() + .enumerate() + .filter(|(idx, _)| *bitmask.get(*idx).unwrap_or(&false)) + .map(|(_, node)| { + NodeCertificate::new(node.metadata().clone(), AggregateSignature::empty()) + }) + .collect(); + if round > 1 { + assert_eq!(parents.len(), validators.len() * 2 / 3 + 1); + } + nodes_at_round.push(Some(new_certified_node( + (round + 1) as u64, + *author, + parents, + ))); + } else { + nodes_at_round.push(None); + } + } + previous_round = nodes_at_round.clone(); + nodes.push(nodes_at_round); + } + nodes +} diff --git a/consensus/src/dag/tests/mod.rs b/consensus/src/dag/tests/mod.rs index 115bcd045144a..eee0c463dede8 100644 --- a/consensus/src/dag/tests/mod.rs +++ b/consensus/src/dag/tests/mod.rs @@ -3,6 +3,7 @@ mod dag_driver_tests; mod dag_network_test; +mod dag_state_sync_tests; mod dag_test; mod fetcher_test; mod helpers; diff --git a/consensus/src/dag/tests/order_rule_tests.rs b/consensus/src/dag/tests/order_rule_tests.rs index d40ab7e4069af..c403820cd89ca 100644 --- a/consensus/src/dag/tests/order_rule_tests.rs +++ b/consensus/src/dag/tests/order_rule_tests.rs @@ -8,8 +8,8 @@ use crate::{ dag_state_sync::DAG_WINDOW, dag_store::Dag, order_rule::OrderRule, - tests::{dag_test::MockStorage, helpers::new_certified_node}, - types::{NodeCertificate, NodeMetadata}, + tests::{dag_test::MockStorage, helpers::generate_dag_nodes}, + types::NodeMetadata, CertifiedNode, }, test_utils::placeholder_ledger_info, @@ -17,8 +17,7 @@ use crate::{ use aptos_consensus_types::common::{Author, Round}; use aptos_infallible::{Mutex, RwLock}; use aptos_types::{ - aggregate_signature::AggregateSignature, epoch_change::EpochChangeProof, - epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, + epoch_change::EpochChangeProof, epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, validator_verifier::random_validator_verifier, }; use async_trait::async_trait; @@ -81,46 +80,6 @@ fn generate_permutations( ) } -/// Generate certified nodes for dag given the virtual dag -fn generate_dag_nodes( - dag: &[Vec>>], - validators: &[Author], -) -> Vec>> { - let mut nodes = vec![]; - let mut previous_round: Vec> = vec![]; - for (round, round_nodes) in dag.iter().enumerate() { - let mut nodes_at_round = vec![]; - for (idx, author) in validators.iter().enumerate() { - if let Some(bitmask) = &round_nodes[idx] { - // the bitmask is compressed (without the holes), we need to flatten the previous round nodes - // to match the index - let parents: Vec<_> = previous_round - .iter() - .flatten() - .enumerate() - .filter(|(idx, _)| *bitmask.get(*idx).unwrap_or(&false)) - .map(|(_, node)| { - NodeCertificate::new(node.metadata().clone(), AggregateSignature::empty()) - }) - .collect(); - if round > 1 { - assert_eq!(parents.len(), validators.len() * 2 / 3 + 1); - } - nodes_at_round.push(Some(new_certified_node( - (round + 1) as u64, - *author, - parents, - ))); - } else { - nodes_at_round.push(None); - } - } - previous_round = nodes_at_round.clone(); - nodes.push(nodes_at_round); - } - nodes -} - pub struct TestNotifier { pub tx: UnboundedSender>>, }