Skip to content

Commit

Permalink
remove committed node status enum
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Sep 5, 2023
1 parent 3ce44ce commit 215f269
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 73 deletions.
1 change: 1 addition & 0 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub fn bootstrap_dag(
epoch_state.clone(),
storage.clone(),
current_round,
0,
)));

let anchor_election = Box::new(RoundRobinAnchorElection::new(validators));
Expand Down
29 changes: 4 additions & 25 deletions consensus/src/dag/dag_state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use super::{
TDAGNetworkSender,
};
use crate::state_replication::StateComputer;
use anyhow::anyhow;
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_time_service::TimeService;
Expand Down Expand Up @@ -75,10 +74,7 @@ impl StateSyncManager {

// if the anchor exists between ledger info round and highest ordered round
// Note: ledger info round <= highest ordered round
if dag_reader
.highest_committed_anchor_round()
.unwrap_or_default()
< ledger_info.commit_info().round()
if dag_reader.highest_committed_anchor_round() < ledger_info.commit_info().round()
&& dag_reader
.highest_ordered_anchor_round()
.unwrap_or_default()
Expand All @@ -98,10 +94,7 @@ impl StateSyncManager {
.highest_ordered_anchor_round()
.unwrap_or_default()
< li.commit_info().round())
|| dag_reader
.highest_committed_anchor_round()
.unwrap_or_default()
+ 2 * DAG_WINDOW
|| dag_reader.highest_committed_anchor_round() + 2 * DAG_WINDOW
< li.commit_info().round()
}

Expand Down Expand Up @@ -153,6 +146,7 @@ impl StateSyncManager {
self.epoch_state.clone(),
self.storage.clone(),
start_round,
commit_li.commit_info().round(),
)));
let bitmask = { sync_dag_store.read().bitmask(target_round) };
let request = RemoteFetchRequest::new(
Expand Down Expand Up @@ -180,22 +174,7 @@ impl StateSyncManager {
// State sync
self.state_computer.sync_to(commit_li.clone()).await?;

{
let mut dag_writer = sync_dag_store.write();
dag_writer.prune();
if let Some(node_status) = dag_writer.get_node_ref_mut_by_round_digest(
commit_li.ledger_info().round(),
commit_li.ledger_info().consensus_data_hash(),
) {
node_status.mark_as_committed();
} else {
error!(
"node for commit ledger info does not exist in DAG: {}",
commit_li
);
return Err(anyhow!("commit ledger info node not found"));
}
}
// TODO: the caller should rebootstrap the order rule

Ok(Some(sync_dag_store))
}
Expand Down
47 changes: 9 additions & 38 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,20 @@ use std::{
pub enum NodeStatus {
Unordered(Arc<CertifiedNode>),
Ordered(Arc<CertifiedNode>),
Committed(Arc<CertifiedNode>),
}

impl NodeStatus {
pub fn as_node(&self) -> &Arc<CertifiedNode> {
match self {
NodeStatus::Unordered(node)
| NodeStatus::Ordered(node)
| NodeStatus::Committed(node) => node,
NodeStatus::Unordered(node) | NodeStatus::Ordered(node) => node,
}
}

pub fn mark_as_ordered(&mut self) {
assert!(matches!(self, NodeStatus::Unordered(_)));
*self = NodeStatus::Ordered(self.as_node().clone());
}

pub fn mark_as_committed(&mut self) {
assert!(!matches!(self, NodeStatus::Committed(_)));
// TODO: try to avoid clone
*self = NodeStatus::Committed(self.as_node().clone());
}
}

/// Data structure that stores the DAG representation, it maintains round based index.
#[derive(Clone)]
pub struct Dag {
Expand All @@ -53,13 +43,16 @@ pub struct Dag {
storage: Arc<dyn DAGStorage>,
initial_round: Round,
epoch_state: Arc<EpochState>,

highest_committed_anchor_round: Round,
}

impl Dag {
pub fn new(
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
initial_round: Round,
highest_committed_anchor_round: Round,
) -> Self {
let epoch = epoch_state.epoch;
let author_to_index = epoch_state.verifier.address_to_validator_index().clone();
Expand Down Expand Up @@ -91,13 +84,15 @@ impl Dag {
storage,
initial_round,
epoch_state,
highest_committed_anchor_round,
}
}

pub fn new_empty(
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
initial_round: Round,
highest_committed_anchor_round: Round,
) -> Self {
let author_to_index = epoch_state.verifier.address_to_validator_index().clone();
let nodes_by_round = BTreeMap::new();
Expand All @@ -107,6 +102,7 @@ impl Dag {
storage,
initial_round,
epoch_state,
highest_committed_anchor_round,
}
}

Expand Down Expand Up @@ -175,15 +171,6 @@ impl Dag {
nodes.filter(|node_metadata| !self.exists(node_metadata))
}

pub fn get_node_ref_mut_by_round_digest(
&mut self,
round: Round,
digest: HashValue,
) -> Option<&mut NodeStatus> {
self.get_round_iter_mut(round)?
.find(|node_status| node_status.as_node().digest() == digest)
}

fn get_node_ref_by_metadata(&self, metadata: &NodeMetadata) -> Option<&NodeStatus> {
self.get_node_ref(metadata.round(), metadata.author())
}
Expand All @@ -206,15 +193,6 @@ impl Dag {
.map(|round_ref| round_ref.iter().flatten())
}

fn get_round_iter_mut(
&mut self,
round: Round,
) -> Option<impl Iterator<Item = &mut NodeStatus>> {
self.nodes_by_round
.get_mut(&round)
.map(|round_ref| round_ref.iter_mut().flatten())
}

pub fn get_node(&self, metadata: &NodeMetadata) -> Option<Arc<CertifiedNode>> {
self.get_node_ref_by_metadata(metadata)
.map(|node_status| node_status.as_node().clone())
Expand Down Expand Up @@ -382,14 +360,7 @@ impl Dag {
None
}

pub(super) fn highest_committed_anchor_round(&self) -> Option<Round> {
for (round, round_nodes) in self.nodes_by_round.iter().rev() {
for maybe_node_status in round_nodes {
if matches!(maybe_node_status, Some(NodeStatus::Committed(_))) {
return Some(*round);
}
}
}
None
pub(super) fn highest_committed_anchor_round(&self) -> Round {
self.highest_committed_anchor_round
}
}
1 change: 1 addition & 0 deletions consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ async fn test_certified_node_handler() {
epoch_state.clone(),
storage.clone(),
1,
0
)));

let network_sender = Arc::new(MockNetworkSender {});
Expand Down
8 changes: 4 additions & 4 deletions consensus/src/dag/tests/dag_state_sync_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use aptos_types::{
validator_verifier::random_validator_verifier,
};
use async_trait::async_trait;
use claims::{assert_none, assert_some_eq};
use claims::assert_none;
use std::{sync::Arc, time::Duration};

struct MockDAGNetworkSender {}
Expand Down Expand Up @@ -150,15 +150,15 @@ async fn test_dag_state_sync() {
.collect::<Vec<_>>();
let nodes = generate_dag_nodes(&virtual_dag, &validators);

let mut fast_dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1);
let mut fast_dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1, 0);
for round_nodes in &nodes {
for node in round_nodes.iter().flatten() {
fast_dag.add_node(node.clone()).unwrap();
}
}
let fast_dag = Arc::new(RwLock::new(fast_dag));

let mut slow_dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1);
let mut slow_dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1, 0);
for round_nodes in nodes.iter().take(SLOW_DAG_ROUNDS) {
for node in round_nodes.iter().flatten() {
slow_dag.add_node(node.clone()).unwrap();
Expand Down Expand Up @@ -205,7 +205,7 @@ async fn test_dag_state_sync() {
);
assert_eq!(dag_reader.highest_round(), (NUM_ROUNDS - 1) as Round);
assert_none!(dag_reader.highest_ordered_anchor_round(),);
assert_some_eq!(
assert_eq!(
dag_reader.highest_committed_anchor_round(),
LI_ROUNDS as Round
);
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/dag/tests/dag_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ fn setup() -> (Vec<ValidatorSigner>, Arc<EpochState>, Dag, Arc<MockStorage>) {
verifier: validator_verifier,
});
let storage = Arc::new(MockStorage::new());
let dag = Dag::new(epoch_state.clone(), storage.clone(), 1);
let dag = Dag::new(epoch_state.clone(), storage.clone(), 1, 0);
(signers, epoch_state, dag, storage)
}

Expand Down Expand Up @@ -194,7 +194,7 @@ fn test_dag_recover_from_storage() {
assert!(dag.add_node(node).is_ok());
}
}
let new_dag = Dag::new(epoch_state.clone(), storage.clone(), 1);
let new_dag = Dag::new(epoch_state.clone(), storage.clone(), 1, 0);

for metadata in &metadatas {
assert!(new_dag.exists(metadata));
Expand All @@ -205,7 +205,7 @@ fn test_dag_recover_from_storage() {
verifier: epoch_state.verifier.clone(),
});

let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone(), 1);
let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone(), 1, 0);
assert!(storage.certified_node_data.lock().is_empty());
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/fetcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn test_dag_fetcher_receiver() {
verifier: validator_verifier,
});
let storage = Arc::new(MockStorage::new());
let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage, 1)));
let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage, 1, 0)));

let mut fetcher = FetchRequestHandler::new(dag.clone(), epoch_state);

Expand Down
4 changes: 2 additions & 2 deletions consensus/src/dag/tests/order_rule_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ proptest! {
epoch: 1,
verifier: validator_verifier,
});
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1);
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1, 0);
for round_nodes in &nodes {
for node in round_nodes.iter().flatten() {
dag.add_node(node.clone()).unwrap();
Expand Down Expand Up @@ -218,7 +218,7 @@ fn test_order_rule_basic() {
epoch: 1,
verifier: validator_verifier,
});
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1);
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1, 0);
for round_nodes in &nodes {
for node in round_nodes.iter().flatten() {
dag.add_node(node.clone()).unwrap();
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/dag/tests/rb_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async fn test_node_broadcast_receiver_succeed() {
epoch_state.clone(),
storage.clone(),
1,
0
)));

let wellformed_node = new_node(1, 10, signers[0].author(), vec![]);
Expand Down Expand Up @@ -87,6 +88,7 @@ async fn test_node_broadcast_receiver_failure() {
epoch_state.clone(),
storage.clone(),
1,
0
)));

NodeBroadcastHandler::new(
Expand Down Expand Up @@ -162,6 +164,7 @@ fn test_node_broadcast_receiver_storage() {
epoch_state.clone(),
storage.clone(),
1,
0
)));

let node = new_node(1, 10, signers[0].author(), vec![]);
Expand Down

0 comments on commit 215f269

Please sign in to comment.