From 94171dba06c0bf081d4a6badabb697ed0706d601 Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Wed, 9 Aug 2023 15:07:49 -0700 Subject: [PATCH] [dag] Integrate dag fetcher with RB node handler --- consensus/src/dag/bootstrap.rs | 6 +- consensus/src/dag/dag_driver.rs | 2 +- consensus/src/dag/dag_fetcher.rs | 11 ++- consensus/src/dag/dag_store.rs | 8 +- consensus/src/dag/rb_handler.rs | 46 ++++++----- consensus/src/dag/tests/dag_driver_tests.rs | 13 ++- consensus/src/dag/tests/dag_test.rs | 6 +- consensus/src/dag/tests/fetcher_test.rs | 2 +- consensus/src/dag/tests/order_rule_tests.rs | 7 +- consensus/src/dag/tests/rb_handler_tests.rs | 90 ++++++++++++++++----- 10 files changed, 128 insertions(+), 63 deletions(-) diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index a07c5f73f1e008..98e4c85fd57306 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -56,7 +56,7 @@ pub fn bootstrap_dag( time_service.clone(), )); - let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone()))); + let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone(), current_round))); let anchor_election = Box::new(RoundRobinAnchorElection::new(validators)); let order_rule = OrderRule::new( @@ -86,10 +86,10 @@ pub fn bootstrap_dag( time_service, storage.clone(), order_rule, - fetch_requester, + fetch_requester.clone(), ); let rb_handler = - NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone()); + NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone(), fetch_requester); let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone()); let dag_handler = NetworkHandler::new( diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 4ccb4b760c555f..9013f5498c4e3b 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -11,7 +11,7 @@ use super::{ use crate::{ dag::{ dag_store::Dag, - types::{CertificateAckState, CertifiedNode, Node, NodeCertificate, SignatureBuilder}, + types::{CertificateAckState, CertifiedNode, Node, NodeCertificate, SignatureBuilder}, dag_fetcher::TFetchRequester, }, state_replication::PayloadClient, }; diff --git a/consensus/src/dag/dag_fetcher.rs b/consensus/src/dag/dag_fetcher.rs index 7ebce618df1410..0ddddf30f8a6d6 100644 --- a/consensus/src/dag/dag_fetcher.rs +++ b/consensus/src/dag/dag_fetcher.rs @@ -53,14 +53,19 @@ impl Stream for FetchWaiter { } } +pub trait TFetchRequester: Send + Sync { + fn request_for_node(&self, node: Node) -> anyhow::Result<()>; + fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()>; +} + pub struct FetchRequester { request_tx: Sender, node_waiter_tx: Sender>, certified_node_waiter_tx: Sender>, } -impl FetchRequester { - pub fn request_for_node(&self, node: Node) -> anyhow::Result<()> { +impl TFetchRequester for FetchRequester { + fn request_for_node(&self, node: Node) -> anyhow::Result<()> { let (res_tx, res_rx) = oneshot::channel(); let fetch_req = LocalFetchRequest::Node(node, res_tx); self.request_tx @@ -70,7 +75,7 @@ impl FetchRequester { Ok(()) } - pub fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()> { + fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()> { let (res_tx, res_rx) = oneshot::channel(); let fetch_req = LocalFetchRequest::CertifiedNode(node, res_tx); self.request_tx.try_send(fetch_req).map_err(|e| { diff --git a/consensus/src/dag/dag_store.rs b/consensus/src/dag/dag_store.rs index 5e81f522db934e..58a12e41fa14c3 100644 --- a/consensus/src/dag/dag_store.rs +++ b/consensus/src/dag/dag_store.rs @@ -45,10 +45,11 @@ pub struct Dag { /// Map between peer id to vector index author_to_index: HashMap, storage: Arc, + initial_round: Round, } impl Dag { - pub fn new(epoch_state: Arc, storage: Arc) -> Self { + pub fn new(epoch_state: Arc, storage: Arc, initial_round: Round) -> Self { let epoch = epoch_state.epoch; let author_to_index = epoch_state.verifier.address_to_validator_index().clone(); let num_validators = author_to_index.len(); @@ -77,6 +78,7 @@ impl Dag { nodes_by_round, author_to_index, storage, + initial_round, } } @@ -85,7 +87,7 @@ impl Dag { .nodes_by_round .first_key_value() .map(|(round, _)| round) - .unwrap_or(&0) + .unwrap_or(&self.initial_round) } pub fn highest_round(&self) -> Round { @@ -93,7 +95,7 @@ impl Dag { .nodes_by_round .last_key_value() .map(|(round, _)| round) - .unwrap_or(&0) + .unwrap_or(&self.initial_round) } pub fn add_node(&mut self, node: CertifiedNode) -> anyhow::Result<()> { diff --git a/consensus/src/dag/rb_handler.rs b/consensus/src/dag/rb_handler.rs index 09bddf19623be9..b07ff039f3fdfb 100644 --- a/consensus/src/dag/rb_handler.rs +++ b/consensus/src/dag/rb_handler.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{storage::DAGStorage, NodeId}; +use super::{dag_fetcher::TFetchRequester, storage::DAGStorage, NodeId}; use crate::dag::{ dag_network::RpcHandler, dag_store::Dag, @@ -21,8 +21,8 @@ pub enum NodeBroadcastHandleError { InvalidParent, #[error("missing parents")] MissingParents, - #[error("parents do not meet quorum voting power")] - NotEnoughParents, + #[error("stale round number")] + StaleRound(Round), } pub(crate) struct NodeBroadcastHandler { @@ -31,6 +31,7 @@ pub(crate) struct NodeBroadcastHandler { signer: ValidatorSigner, epoch_state: Arc, storage: Arc, + fetch_requester: Arc, } impl NodeBroadcastHandler { @@ -39,6 +40,7 @@ impl NodeBroadcastHandler { signer: ValidatorSigner, epoch_state: Arc, storage: Arc, + fetch_requester: Arc, ) -> Self { let epoch = epoch_state.epoch; let votes_by_round_peer = read_votes_from_storage(&storage, epoch); @@ -49,6 +51,7 @@ impl NodeBroadcastHandler { signer, epoch_state, storage, + fetch_requester, } } @@ -67,21 +70,15 @@ impl NodeBroadcastHandler { self.storage.delete_votes(to_delete) } - fn validate(&self, node: &Node) -> anyhow::Result<()> { + fn validate(&self, node: Node) -> anyhow::Result { let current_round = node.metadata().round(); - // round 0 is a special case and does not require any parents - if current_round == 0 { - return Ok(()); - } - - let prev_round = current_round - 1; - let dag_reader = self.dag.read(); - // check if the parent round is missing in the DAG + let lowest_round = dag_reader.lowest_round(); + ensure!( - prev_round >= dag_reader.lowest_round(), - NodeBroadcastHandleError::MissingParents + current_round >= lowest_round, + NodeBroadcastHandleError::StaleRound(current_round) ); // check which parents are missing in the DAG @@ -91,19 +88,30 @@ impl NodeBroadcastHandler { .filter(|parent| !dag_reader.exists(parent.metadata())) .cloned() .collect(); + drop(dag_reader); // Drop the DAG store early as it is no longer required + if !missing_parents.is_empty() { - // For each missing parent, verify their signatures and voting power + // For each missing parent, verify their signatures and voting power. + // Otherwise, a malicious node can send bad nodes with fake parents + // and cause this peer to issue unnecessary fetch requests. ensure!( missing_parents .iter() .all(|parent| { parent.verify(&self.epoch_state.verifier).is_ok() }), NodeBroadcastHandleError::InvalidParent ); - // TODO: notify dag fetcher to fetch missing node and drop this node - bail!(NodeBroadcastHandleError::MissingParents); + + // Don't issue fetch requests for parents of the lowest round in the DAG + // because they are already GC'ed + if current_round > lowest_round { + if let Err(err) = self.fetch_requester.request_for_node(node) { + error!("request to fetch failed: {}", err); + } + bail!(NodeBroadcastHandleError::MissingParents); + } } - Ok(()) + Ok(node) } } @@ -137,7 +145,7 @@ impl RpcHandler for NodeBroadcastHandler { type Response = Vote; fn process(&mut self, node: Self::Request) -> anyhow::Result { - self.validate(&node)?; + let node = self.validate(node)?; let votes_by_peer = self .votes_by_round_peer diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index 6fb85f2399e97a..7c5b6c026edab6 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -72,9 +72,7 @@ fn test_certified_node_handler() { verifier: validator_verifier, }); let storage = Arc::new(MockStorage::new()); - let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone()))); - - let zeroth_round_node = new_certified_node(0, signers[0].author(), vec![]); + let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone(), 1))); let network_sender = Arc::new(MockNetworkSender {}); let rb = Arc::new(ReliableBroadcast::new( @@ -115,13 +113,14 @@ fn test_certified_node_handler() { fetch_requester, ); + let first_round_node = new_certified_node(1, signers[0].author(), vec![]); // expect an ack for a valid message - assert_ok!(driver.process(zeroth_round_node.clone())); + assert_ok!(driver.process(first_round_node.clone())); // expect an ack if the same message is sent again - assert_ok_eq!(driver.process(zeroth_round_node), CertifiedAck::new(1)); + assert_ok_eq!(driver.process(first_round_node), CertifiedAck::new(1)); - let parent_node = new_certified_node(0, signers[1].author(), vec![]); - let invalid_node = new_certified_node(1, signers[0].author(), vec![parent_node.certificate()]); + let parent_node = new_certified_node(1, signers[1].author(), vec![]); + let invalid_node = new_certified_node(2, signers[0].author(), vec![parent_node.certificate()]); assert_eq!( driver.process(invalid_node).unwrap_err().to_string(), DagDriverError::MissingParents.to_string() diff --git a/consensus/src/dag/tests/dag_test.rs b/consensus/src/dag/tests/dag_test.rs index 24a33797d00022..3743808870393b 100644 --- a/consensus/src/dag/tests/dag_test.rs +++ b/consensus/src/dag/tests/dag_test.rs @@ -102,7 +102,7 @@ fn setup() -> (Vec, Arc, Dag, Arc) { verifier: validator_verifier, }); let storage = Arc::new(MockStorage::new()); - let dag = Dag::new(epoch_state.clone(), storage.clone()); + let dag = Dag::new(epoch_state.clone(), storage.clone(), 1); (signers, epoch_state, dag, storage) } @@ -190,7 +190,7 @@ fn test_dag_recover_from_storage() { assert!(dag.add_node(node).is_ok()); } } - let new_dag = Dag::new(epoch_state.clone(), storage.clone()); + let new_dag = Dag::new(epoch_state.clone(), storage.clone(), 1); for metadata in &metadatas { assert!(new_dag.exists(metadata)); @@ -201,7 +201,7 @@ fn test_dag_recover_from_storage() { verifier: epoch_state.verifier.clone(), }); - let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone()); + let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone(), 1); assert!(storage.certified_node_data.lock().is_empty()); } diff --git a/consensus/src/dag/tests/fetcher_test.rs b/consensus/src/dag/tests/fetcher_test.rs index abecbcd4360392..113c3b8d9e501b 100644 --- a/consensus/src/dag/tests/fetcher_test.rs +++ b/consensus/src/dag/tests/fetcher_test.rs @@ -21,7 +21,7 @@ fn test_dag_fetcher_receiver() { verifier: validator_verifier, }); let storage = Arc::new(MockStorage::new()); - let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage))); + let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage, 1))); let mut fetcher = FetchRequestHandler::new(dag.clone(), epoch_state); diff --git a/consensus/src/dag/tests/order_rule_tests.rs b/consensus/src/dag/tests/order_rule_tests.rs index a9b47656c5a804..14e20c6db0c5f9 100644 --- a/consensus/src/dag/tests/order_rule_tests.rs +++ b/consensus/src/dag/tests/order_rule_tests.rs @@ -1,12 +1,13 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use super::helpers::new_certified_node; use crate::{ dag::{ anchor_election::RoundRobinAnchorElection, dag_store::Dag, order_rule::OrderRule, - tests::{dag_test::MockStorage, helpers::new_certified_node}, + tests::dag_test::MockStorage, types::{NodeCertificate, NodeMetadata}, CertifiedNode, }, @@ -153,7 +154,7 @@ proptest! { epoch: 1, verifier: validator_verifier, }); - let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new())); + let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1); for round_nodes in &nodes { for node in round_nodes.iter().flatten() { dag.add_node(node.clone()).unwrap(); @@ -231,7 +232,7 @@ fn test_order_rule_basic() { epoch: 1, verifier: validator_verifier, }); - let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new())); + let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1); for round_nodes in &nodes { for node in round_nodes.iter().flatten() { dag.add_node(node.clone()).unwrap(); diff --git a/consensus/src/dag/tests/rb_handler_tests.rs b/consensus/src/dag/tests/rb_handler_tests.rs index 994f97a0474c07..bd1e18a4cd2685 100644 --- a/consensus/src/dag/tests/rb_handler_tests.rs +++ b/consensus/src/dag/tests/rb_handler_tests.rs @@ -2,10 +2,14 @@ // SPDX-License-Identifier: Apache-2.0 use crate::dag::{ + dag_fetcher::TFetchRequester, dag_store::Dag, rb_handler::{NodeBroadcastHandleError, NodeBroadcastHandler}, storage::DAGStorage, - tests::{dag_test::MockStorage, helpers::new_node}, + tests::{ + dag_test::MockStorage, + helpers::new_node, + }, types::NodeCertificate, NodeId, RpcHandler, Vote, }; @@ -17,22 +21,46 @@ use aptos_types::{ use claims::{assert_ok, assert_ok_eq}; use std::{collections::BTreeMap, sync::Arc}; +struct MockFetchRequester {} + +impl TFetchRequester for MockFetchRequester { + fn request_for_node(&self, _node: crate::dag::Node) -> anyhow::Result<()> { + Ok(()) + } + + fn request_for_certified_node(&self, _node: crate::dag::CertifiedNode) -> anyhow::Result<()> { + Ok(()) + } +} + #[tokio::test] async fn test_node_broadcast_receiver_succeed() { let (signers, validator_verifier) = random_validator_verifier(4, None, false); let epoch_state = Arc::new(EpochState { epoch: 1, - verifier: validator_verifier, + verifier: validator_verifier.clone(), }); + + // Scenario: Start DAG from beginning let storage = Arc::new(MockStorage::new()); - let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone()))); + let dag = Arc::new(RwLock::new(Dag::new( + epoch_state.clone(), + storage.clone(), + 1, + ))); - let wellformed_node = new_node(0, 10, signers[0].author(), vec![]); - let equivocating_node = new_node(0, 20, signers[0].author(), vec![]); + let wellformed_node = new_node(1, 10, signers[0].author(), vec![]); + let equivocating_node = new_node(1, 20, signers[0].author(), vec![]); assert_ne!(wellformed_node.digest(), equivocating_node.digest()); - let mut rb_receiver = NodeBroadcastHandler::new(dag, signers[3].clone(), epoch_state, storage); + let mut rb_receiver = NodeBroadcastHandler::new( + dag, + signers[3].clone(), + epoch_state.clone(), + storage.clone(), + Arc::new(MockFetchRequester {}), + ); let expected_result = Vote::new( wellformed_node.metadata().clone(), @@ -44,6 +72,8 @@ async fn test_node_broadcast_receiver_succeed() { assert_ok_eq!(rb_receiver.process(equivocating_node), expected_result); } +// TODO: Unit test node broad receiver with a pruned DAG store. Possibly need a validator verifier trait. + #[tokio::test] async fn test_node_broadcast_receiver_failure() { let (signers, validator_verifier) = random_validator_verifier(4, None, false); @@ -56,17 +86,27 @@ async fn test_node_broadcast_receiver_failure() { .iter() .map(|signer| { let storage = Arc::new(MockStorage::new()); - let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone()))); - - NodeBroadcastHandler::new(dag, signer.clone(), epoch_state.clone(), storage) + let dag = Arc::new(RwLock::new(Dag::new( + epoch_state.clone(), + storage.clone(), + 1, + ))); + + NodeBroadcastHandler::new( + dag, + signer.clone(), + epoch_state.clone(), + storage, + Arc::new(MockFetchRequester {}), + ) }) .collect(); - // Round 0 - let node = new_node(0, 10, signers[0].author(), vec![]); + // Round 1 + let node = new_node(1, 10, signers[0].author(), vec![]); let vote = rb_receivers[1].process(node.clone()).unwrap(); - // Round 1 with invalid parent + // Round 2 with invalid parent let partial_sigs = PartialSignatures::new(BTreeMap::from([( signers[1].author(), vote.signature().clone(), @@ -77,17 +117,17 @@ async fn test_node_broadcast_receiver_failure() { .aggregate_signatures(&partial_sigs) .unwrap(), ); - let node = new_node(1, 20, signers[0].author(), vec![node_cert]); + let node = new_node(2, 20, signers[0].author(), vec![node_cert]); assert_eq!( rb_receivers[1].process(node).unwrap_err().to_string(), NodeBroadcastHandleError::InvalidParent.to_string(), ); - // Round 0 - add all nodes + // Round 1 - add all nodes let node_certificates: Vec<_> = signers .iter() .map(|signer| { - let node = new_node(0, 10, signer.author(), vec![]); + let node = new_node(1, 10, signer.author(), vec![]); let mut partial_sigs = PartialSignatures::empty(); rb_receivers .iter_mut() @@ -105,8 +145,8 @@ async fn test_node_broadcast_receiver_failure() { }) .collect(); - // Add Round 1 node with proper certificates - let node = new_node(1, 20, signers[0].author(), node_certificates); + // Add Round 2 node with proper certificates + let node = new_node(2, 20, signers[0].author(), node_certificates); assert_eq!( rb_receivers[0].process(node).unwrap_err().to_string(), NodeBroadcastHandleError::MissingParents.to_string() @@ -121,7 +161,11 @@ fn test_node_broadcast_receiver_storage() { verifier: validator_verifier, }); let storage = Arc::new(MockStorage::new()); - let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone()))); + let dag = Arc::new(RwLock::new(Dag::new( + epoch_state.clone(), + storage.clone(), + 1, + ))); let node = new_node(1, 10, signers[0].author(), vec![]); @@ -130,6 +174,7 @@ fn test_node_broadcast_receiver_storage() { signers[3].clone(), epoch_state.clone(), storage.clone(), + Arc::new(MockFetchRequester {}), ); let sig = rb_receiver.process(node).expect("must succeed"); @@ -138,8 +183,13 @@ fn test_node_broadcast_receiver_storage() { sig )],); - let mut rb_receiver = - NodeBroadcastHandler::new(dag, signers[3].clone(), epoch_state, storage.clone()); + let mut rb_receiver = NodeBroadcastHandler::new( + dag, + signers[3].clone(), + epoch_state, + storage.clone(), + Arc::new(MockFetchRequester {}), + ); assert_ok!(rb_receiver.gc_before_round(2)); assert_eq!(storage.get_votes().unwrap().len(), 0); }