diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index ba8c9047f989f9..9090463f1a689f 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use super::{ + order_rule::OrderRule, storage::DAGStorage, types::{CertifiedAck, DAGMessage}, RpcHandler, @@ -43,6 +44,7 @@ pub(crate) struct DagDriver { time_service: TimeService, rb_abort_handle: Option, storage: Arc, + order_rule: OrderRule, } impl DagDriver { @@ -55,6 +57,7 @@ impl DagDriver { current_round: Round, time_service: TimeService, storage: Arc, + order_rule: OrderRule, ) -> Self { // TODO: rebroadcast nodes after recovery Self { @@ -67,6 +70,7 @@ impl DagDriver { time_service, rb_abort_handle: None, storage, + order_rule, } } @@ -149,11 +153,13 @@ impl RpcHandler for DagDriver { { let dag_reader = self.dag.read(); if dag_reader.exists(node.metadata()) { - return Ok(CertifiedAck::new(node.metadata().epoch())); + return Ok(CertifiedAck::new(epoch)); } } - self.add_node(node)?; + let node_metadata = node.metadata().clone(); + self.add_node(node) + .map(|_| self.order_rule.process_new_node(&node_metadata))?; Ok(CertifiedAck::new(epoch)) } diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index d6ff61e73cf3b9..e07a41f4f21047 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -2,7 +2,7 @@ use super::{ dag_driver::DagDriver, dag_fetcher::FetchRequestHandler, dag_network::DAGNetworkSender, - storage::DAGStorage, types::TDAGMessage, + order_rule::OrderRule, storage::DAGStorage, types::TDAGMessage, }; use crate::{ dag::{ @@ -45,6 +45,7 @@ impl NetworkHandler { _dag_network_sender: Arc, rb_network_sender: Arc>, time_service: TimeService, + order_rule: OrderRule, ) -> Self { let rb = Arc::new(ReliableBroadcast::new( epoch_state.verifier.get_ordered_account_addresses().clone(), @@ -69,6 +70,7 @@ impl NetworkHandler { 1, time_service, storage, + order_rule, ), epoch_state: epoch_state.clone(), fetch_receiver: FetchRequestHandler::new(dag, epoch_state), diff --git a/consensus/src/dag/order_rule.rs b/consensus/src/dag/order_rule.rs index 3cf87624419699..d48697d7e715e5 100644 --- a/consensus/src/dag/order_rule.rs +++ b/consensus/src/dag/order_rule.rs @@ -46,8 +46,8 @@ impl OrderRule { (r1 ^ r2) & 1 == 0 } - pub fn process_new_node(&mut self, node: &CertifiedNode) { - let round = node.round(); + pub fn process_new_node(&mut self, node_metadata: &NodeMetadata) { + let round = node_metadata.round(); // If the node comes from the proposal round in the current instance, it can't trigger any ordering if round <= self.lowest_unordered_anchor_round || Self::check_parity(round, self.lowest_unordered_anchor_round) diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index 11e6b2417b34f2..1b9ccd38869b98 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -2,9 +2,11 @@ use crate::{ dag::{ + anchor_election::RoundRobinAnchorElection, dag_driver::{DagDriver, DagDriverError}, dag_network::{DAGNetworkSender, RpcWithFallback}, dag_store::Dag, + order_rule::OrderRule, tests::{dag_test::MockStorage, helpers::new_certified_node}, types::{CertifiedAck, DAGMessage}, RpcHandler, @@ -15,7 +17,9 @@ use aptos_consensus_types::common::Author; use aptos_infallible::RwLock; use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; use aptos_time_service::TimeService; -use aptos_types::{epoch_state::EpochState, validator_verifier::random_validator_verifier}; +use aptos_types::{ + epoch_state::EpochState, ledger_info::LedgerInfo, validator_verifier::random_validator_verifier, +}; use async_trait::async_trait; use claims::{assert_ok, assert_ok_eq}; use std::{sync::Arc, time::Duration}; @@ -78,6 +82,15 @@ fn test_certified_node_handler() { aptos_time_service::TimeService::mock(), )); let time_service = TimeService::mock(); + let (ordered_nodes_sender, _) = futures_channel::mpsc::unbounded(); + let validators = signers.iter().map(|vs| vs.author()).collect(); + let order_rule = OrderRule::new( + epoch_state.clone(), + LedgerInfo::mock_genesis(None), + dag.clone(), + Box::new(RoundRobinAnchorElection::new(validators)), + ordered_nodes_sender, + ); let mut driver = DagDriver::new( signers[0].author(), epoch_state, @@ -87,6 +100,7 @@ fn test_certified_node_handler() { 1, time_service, storage, + order_rule, ); // expect an ack for a valid message diff --git a/consensus/src/dag/tests/order_rule_tests.rs b/consensus/src/dag/tests/order_rule_tests.rs index a56cfad56f5efc..a9b47656c5a804 100644 --- a/consensus/src/dag/tests/order_rule_tests.rs +++ b/consensus/src/dag/tests/order_rule_tests.rs @@ -167,7 +167,7 @@ proptest! { let dag = Arc::new(RwLock::new(dag.clone())); let (mut order_rule, mut receiver) = create_order_rule(epoch_state.clone(), dag); for idx in seq { - order_rule.process_new_node(&flatten_nodes[idx]); + order_rule.process_new_node(flatten_nodes[idx].metadata()); } let mut ordered = vec![]; while let Ok(Some(mut ordered_nodes)) = receiver.try_next() { @@ -241,7 +241,7 @@ fn test_order_rule_basic() { let dag = Arc::new(RwLock::new(dag.clone())); let (mut order_rule, mut receiver) = create_order_rule(epoch_state, dag); for node in nodes.iter().flatten().flatten() { - order_rule.process_new_node(node); + order_rule.process_new_node(node.metadata()); } let expected_order = vec![ // anchor (1, 0) has 1 votes, anchor (3, 1) has 2 votes and a path to (1, 0)