From 215f269a7aec41a48c0d67fc5c77425479e96c7a Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Tue, 5 Sep 2023 12:18:19 -0700 Subject: [PATCH] remove committed node status enum --- consensus/src/dag/bootstrap.rs | 1 + consensus/src/dag/dag_state_sync.rs | 29 ++---------- consensus/src/dag/dag_store.rs | 47 ++++--------------- consensus/src/dag/tests/dag_driver_tests.rs | 1 + .../src/dag/tests/dag_state_sync_tests.rs | 8 ++-- consensus/src/dag/tests/dag_test.rs | 6 +-- consensus/src/dag/tests/fetcher_test.rs | 2 +- consensus/src/dag/tests/order_rule_tests.rs | 4 +- consensus/src/dag/tests/rb_handler_tests.rs | 3 ++ 9 files changed, 28 insertions(+), 73 deletions(-) diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 4fd9be4a1b999f..bfa1dcdbead517 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -65,6 +65,7 @@ pub fn bootstrap_dag( epoch_state.clone(), storage.clone(), current_round, + 0, ))); let anchor_election = Box::new(RoundRobinAnchorElection::new(validators)); diff --git a/consensus/src/dag/dag_state_sync.rs b/consensus/src/dag/dag_state_sync.rs index 329438f52f920b..e1e18a650f9439 100644 --- a/consensus/src/dag/dag_state_sync.rs +++ b/consensus/src/dag/dag_state_sync.rs @@ -8,7 +8,6 @@ use super::{ TDAGNetworkSender, }; use crate::state_replication::StateComputer; -use anyhow::anyhow; use aptos_infallible::RwLock; use aptos_logger::error; use aptos_time_service::TimeService; @@ -75,10 +74,7 @@ impl StateSyncManager { // if the anchor exists between ledger info round and highest ordered round // Note: ledger info round <= highest ordered round - if dag_reader - .highest_committed_anchor_round() - .unwrap_or_default() - < ledger_info.commit_info().round() + if dag_reader.highest_committed_anchor_round() < ledger_info.commit_info().round() && dag_reader .highest_ordered_anchor_round() .unwrap_or_default() @@ -98,10 +94,7 @@ impl StateSyncManager { .highest_ordered_anchor_round() .unwrap_or_default() < li.commit_info().round()) - || dag_reader - .highest_committed_anchor_round() - .unwrap_or_default() - + 2 * DAG_WINDOW + || dag_reader.highest_committed_anchor_round() + 2 * DAG_WINDOW < li.commit_info().round() } @@ -153,6 +146,7 @@ impl StateSyncManager { self.epoch_state.clone(), self.storage.clone(), start_round, + commit_li.commit_info().round(), ))); let bitmask = { sync_dag_store.read().bitmask(target_round) }; let request = RemoteFetchRequest::new( @@ -180,22 +174,7 @@ impl StateSyncManager { // State sync self.state_computer.sync_to(commit_li.clone()).await?; - { - let mut dag_writer = sync_dag_store.write(); - dag_writer.prune(); - if let Some(node_status) = dag_writer.get_node_ref_mut_by_round_digest( - commit_li.ledger_info().round(), - commit_li.ledger_info().consensus_data_hash(), - ) { - node_status.mark_as_committed(); - } else { - error!( - "node for commit ledger info does not exist in DAG: {}", - commit_li - ); - return Err(anyhow!("commit ledger info node not found")); - } - } + // TODO: the caller should rebootstrap the order rule Ok(Some(sync_dag_store)) } diff --git a/consensus/src/dag/dag_store.rs b/consensus/src/dag/dag_store.rs index 142d795f3d2f05..d6131df0fcef3b 100644 --- a/consensus/src/dag/dag_store.rs +++ b/consensus/src/dag/dag_store.rs @@ -20,15 +20,12 @@ use std::{ pub enum NodeStatus { Unordered(Arc), Ordered(Arc), - Committed(Arc), } impl NodeStatus { pub fn as_node(&self) -> &Arc { match self { - NodeStatus::Unordered(node) - | NodeStatus::Ordered(node) - | NodeStatus::Committed(node) => node, + NodeStatus::Unordered(node) | NodeStatus::Ordered(node) => node, } } @@ -36,14 +33,7 @@ impl NodeStatus { assert!(matches!(self, NodeStatus::Unordered(_))); *self = NodeStatus::Ordered(self.as_node().clone()); } - - pub fn mark_as_committed(&mut self) { - assert!(!matches!(self, NodeStatus::Committed(_))); - // TODO: try to avoid clone - *self = NodeStatus::Committed(self.as_node().clone()); - } } - /// Data structure that stores the DAG representation, it maintains round based index. #[derive(Clone)] pub struct Dag { @@ -53,6 +43,8 @@ pub struct Dag { storage: Arc, initial_round: Round, epoch_state: Arc, + + highest_committed_anchor_round: Round, } impl Dag { @@ -60,6 +52,7 @@ impl Dag { epoch_state: Arc, storage: Arc, initial_round: Round, + highest_committed_anchor_round: Round, ) -> Self { let epoch = epoch_state.epoch; let author_to_index = epoch_state.verifier.address_to_validator_index().clone(); @@ -91,6 +84,7 @@ impl Dag { storage, initial_round, epoch_state, + highest_committed_anchor_round, } } @@ -98,6 +92,7 @@ impl Dag { epoch_state: Arc, storage: Arc, initial_round: Round, + highest_committed_anchor_round: Round, ) -> Self { let author_to_index = epoch_state.verifier.address_to_validator_index().clone(); let nodes_by_round = BTreeMap::new(); @@ -107,6 +102,7 @@ impl Dag { storage, initial_round, epoch_state, + highest_committed_anchor_round, } } @@ -175,15 +171,6 @@ impl Dag { nodes.filter(|node_metadata| !self.exists(node_metadata)) } - pub fn get_node_ref_mut_by_round_digest( - &mut self, - round: Round, - digest: HashValue, - ) -> Option<&mut NodeStatus> { - self.get_round_iter_mut(round)? - .find(|node_status| node_status.as_node().digest() == digest) - } - fn get_node_ref_by_metadata(&self, metadata: &NodeMetadata) -> Option<&NodeStatus> { self.get_node_ref(metadata.round(), metadata.author()) } @@ -206,15 +193,6 @@ impl Dag { .map(|round_ref| round_ref.iter().flatten()) } - fn get_round_iter_mut( - &mut self, - round: Round, - ) -> Option> { - self.nodes_by_round - .get_mut(&round) - .map(|round_ref| round_ref.iter_mut().flatten()) - } - pub fn get_node(&self, metadata: &NodeMetadata) -> Option> { self.get_node_ref_by_metadata(metadata) .map(|node_status| node_status.as_node().clone()) @@ -382,14 +360,7 @@ impl Dag { None } - pub(super) fn highest_committed_anchor_round(&self) -> Option { - for (round, round_nodes) in self.nodes_by_round.iter().rev() { - for maybe_node_status in round_nodes { - if matches!(maybe_node_status, Some(NodeStatus::Committed(_))) { - return Some(*round); - } - } - } - None + pub(super) fn highest_committed_anchor_round(&self) -> Round { + self.highest_committed_anchor_round } } diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index 1e2f519c009787..51d1d6f8f4a368 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -79,6 +79,7 @@ async fn test_certified_node_handler() { epoch_state.clone(), storage.clone(), 1, + 0 ))); let network_sender = Arc::new(MockNetworkSender {}); diff --git a/consensus/src/dag/tests/dag_state_sync_tests.rs b/consensus/src/dag/tests/dag_state_sync_tests.rs index 307c1e9cb4d5ed..e1ae22309cb5fd 100644 --- a/consensus/src/dag/tests/dag_state_sync_tests.rs +++ b/consensus/src/dag/tests/dag_state_sync_tests.rs @@ -26,7 +26,7 @@ use aptos_types::{ validator_verifier::random_validator_verifier, }; use async_trait::async_trait; -use claims::{assert_none, assert_some_eq}; +use claims::assert_none; use std::{sync::Arc, time::Duration}; struct MockDAGNetworkSender {} @@ -150,7 +150,7 @@ async fn test_dag_state_sync() { .collect::>(); let nodes = generate_dag_nodes(&virtual_dag, &validators); - let mut fast_dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1); + let mut fast_dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1, 0); for round_nodes in &nodes { for node in round_nodes.iter().flatten() { fast_dag.add_node(node.clone()).unwrap(); @@ -158,7 +158,7 @@ async fn test_dag_state_sync() { } let fast_dag = Arc::new(RwLock::new(fast_dag)); - let mut slow_dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1); + let mut slow_dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1, 0); for round_nodes in nodes.iter().take(SLOW_DAG_ROUNDS) { for node in round_nodes.iter().flatten() { slow_dag.add_node(node.clone()).unwrap(); @@ -205,7 +205,7 @@ async fn test_dag_state_sync() { ); assert_eq!(dag_reader.highest_round(), (NUM_ROUNDS - 1) as Round); assert_none!(dag_reader.highest_ordered_anchor_round(),); - assert_some_eq!( + assert_eq!( dag_reader.highest_committed_anchor_round(), LI_ROUNDS as Round ); diff --git a/consensus/src/dag/tests/dag_test.rs b/consensus/src/dag/tests/dag_test.rs index 7d235f34ff5fd9..0c43698db2a86e 100644 --- a/consensus/src/dag/tests/dag_test.rs +++ b/consensus/src/dag/tests/dag_test.rs @@ -106,7 +106,7 @@ fn setup() -> (Vec, Arc, Dag, Arc) { verifier: validator_verifier, }); let storage = Arc::new(MockStorage::new()); - let dag = Dag::new(epoch_state.clone(), storage.clone(), 1); + let dag = Dag::new(epoch_state.clone(), storage.clone(), 1, 0); (signers, epoch_state, dag, storage) } @@ -194,7 +194,7 @@ fn test_dag_recover_from_storage() { assert!(dag.add_node(node).is_ok()); } } - let new_dag = Dag::new(epoch_state.clone(), storage.clone(), 1); + let new_dag = Dag::new(epoch_state.clone(), storage.clone(), 1, 0); for metadata in &metadatas { assert!(new_dag.exists(metadata)); @@ -205,7 +205,7 @@ fn test_dag_recover_from_storage() { verifier: epoch_state.verifier.clone(), }); - let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone(), 1); + let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone(), 1, 0); assert!(storage.certified_node_data.lock().is_empty()); } diff --git a/consensus/src/dag/tests/fetcher_test.rs b/consensus/src/dag/tests/fetcher_test.rs index 113c3b8d9e501b..f49674e01e066e 100644 --- a/consensus/src/dag/tests/fetcher_test.rs +++ b/consensus/src/dag/tests/fetcher_test.rs @@ -21,7 +21,7 @@ fn test_dag_fetcher_receiver() { verifier: validator_verifier, }); let storage = Arc::new(MockStorage::new()); - let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage, 1))); + let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage, 1, 0))); let mut fetcher = FetchRequestHandler::new(dag.clone(), epoch_state); diff --git a/consensus/src/dag/tests/order_rule_tests.rs b/consensus/src/dag/tests/order_rule_tests.rs index 3056d246b593a0..a647ab0957ef6b 100644 --- a/consensus/src/dag/tests/order_rule_tests.rs +++ b/consensus/src/dag/tests/order_rule_tests.rs @@ -131,7 +131,7 @@ proptest! { epoch: 1, verifier: validator_verifier, }); - let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1); + let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1, 0); for round_nodes in &nodes { for node in round_nodes.iter().flatten() { dag.add_node(node.clone()).unwrap(); @@ -218,7 +218,7 @@ fn test_order_rule_basic() { epoch: 1, verifier: validator_verifier, }); - let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1); + let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1, 0); for round_nodes in &nodes { for node in round_nodes.iter().flatten() { dag.add_node(node.clone()).unwrap(); diff --git a/consensus/src/dag/tests/rb_handler_tests.rs b/consensus/src/dag/tests/rb_handler_tests.rs index 5b85fa1dce856f..6afc89205496ac 100644 --- a/consensus/src/dag/tests/rb_handler_tests.rs +++ b/consensus/src/dag/tests/rb_handler_tests.rs @@ -44,6 +44,7 @@ async fn test_node_broadcast_receiver_succeed() { epoch_state.clone(), storage.clone(), 1, + 0 ))); let wellformed_node = new_node(1, 10, signers[0].author(), vec![]); @@ -87,6 +88,7 @@ async fn test_node_broadcast_receiver_failure() { epoch_state.clone(), storage.clone(), 1, + 0 ))); NodeBroadcastHandler::new( @@ -162,6 +164,7 @@ fn test_node_broadcast_receiver_storage() { epoch_state.clone(), storage.clone(), 1, + 0 ))); let node = new_node(1, 10, signers[0].author(), vec![]);