diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 3d7c0289b8e846..76a8233094a646 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -88,10 +88,10 @@ pub fn bootstrap_dag( time_service, storage.clone(), order_rule, - fetch_requester, + fetch_requester.clone(), ); let rb_handler = - NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone()); + NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone(), fetch_requester); let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone()); let dag_handler = NetworkHandler::new( diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 9207add142d64e..d8619356728f29 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -11,7 +11,7 @@ use super::{ use crate::{ dag::{ dag_store::Dag, - types::{CertificateAckState, CertifiedNode, Node, NodeCertificate, SignatureBuilder}, + types::{CertificateAckState, CertifiedNode, Node, NodeCertificate, SignatureBuilder}, dag_fetcher::TFetchRequester, }, state_replication::PayloadClient, }; diff --git a/consensus/src/dag/dag_fetcher.rs b/consensus/src/dag/dag_fetcher.rs index 8216b6ae14c0cf..72f21c4551e081 100644 --- a/consensus/src/dag/dag_fetcher.rs +++ b/consensus/src/dag/dag_fetcher.rs @@ -53,14 +53,19 @@ impl Stream for FetchWaiter { } } +pub trait TFetchRequester: Send + Sync { + fn request_for_node(&self, node: Node) -> anyhow::Result<()>; + fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()>; +} + pub struct FetchRequester { request_tx: Sender, node_tx: Sender>, certified_node_tx: Sender>, } -impl FetchRequester { - pub fn request_for_node(&self, node: Node) -> anyhow::Result<()> { +impl TFetchRequester for FetchRequester { + fn request_for_node(&self, node: Node) -> anyhow::Result<()> { let (res_tx, res_rx) = oneshot::channel(); let fetch_req = LocalFetchRequest::Node(node, res_tx); self.request_tx @@ -70,7 +75,7 @@ impl FetchRequester { Ok(()) } - pub fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()> { + fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()> { let (res_tx, res_rx) = oneshot::channel(); let fetch_req = LocalFetchRequest::CertifiedNode(node, res_tx); self.request_tx diff --git a/consensus/src/dag/rb_handler.rs b/consensus/src/dag/rb_handler.rs index 09bddf19623be9..953c5b7a065aeb 100644 --- a/consensus/src/dag/rb_handler.rs +++ b/consensus/src/dag/rb_handler.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::{storage::DAGStorage, NodeId}; +use super::{dag_fetcher::TFetchRequester, storage::DAGStorage, NodeId}; use crate::dag::{ dag_network::RpcHandler, dag_store::Dag, @@ -23,6 +23,8 @@ pub enum NodeBroadcastHandleError { MissingParents, #[error("parents do not meet quorum voting power")] NotEnoughParents, + #[error("invalid round number")] + InvalidRound, } pub(crate) struct NodeBroadcastHandler { @@ -31,6 +33,7 @@ pub(crate) struct NodeBroadcastHandler { signer: ValidatorSigner, epoch_state: Arc, storage: Arc, + fetch_requester: Arc, } impl NodeBroadcastHandler { @@ -39,6 +42,7 @@ impl NodeBroadcastHandler { signer: ValidatorSigner, epoch_state: Arc, storage: Arc, + fetch_requester: Arc, ) -> Self { let epoch = epoch_state.epoch; let votes_by_round_peer = read_votes_from_storage(&storage, epoch); @@ -49,6 +53,7 @@ impl NodeBroadcastHandler { signer, epoch_state, storage, + fetch_requester, } } @@ -67,22 +72,24 @@ impl NodeBroadcastHandler { self.storage.delete_votes(to_delete) } - fn validate(&self, node: &Node) -> anyhow::Result<()> { + fn validate(&self, node: Node) -> anyhow::Result { let current_round = node.metadata().round(); // round 0 is a special case and does not require any parents if current_round == 0 { - return Ok(()); + bail!(NodeBroadcastHandleError::InvalidRound); } let prev_round = current_round - 1; let dag_reader = self.dag.read(); // check if the parent round is missing in the DAG - ensure!( - prev_round >= dag_reader.lowest_round(), - NodeBroadcastHandleError::MissingParents - ); + if prev_round >= dag_reader.lowest_round() { + if let Err(err) = self.fetch_requester.request_for_node(node) { + error!("request to fetch failed: {}", err); + } + bail!(NodeBroadcastHandleError::MissingParents); + } // check which parents are missing in the DAG let missing_parents: Vec = node @@ -99,11 +106,13 @@ impl NodeBroadcastHandler { .all(|parent| { parent.verify(&self.epoch_state.verifier).is_ok() }), NodeBroadcastHandleError::InvalidParent ); - // TODO: notify dag fetcher to fetch missing node and drop this node + if let Err(err) = self.fetch_requester.request_for_node(node) { + error!("request to fetch failed: {}", err); + } bail!(NodeBroadcastHandleError::MissingParents); } - Ok(()) + Ok(node) } } @@ -137,7 +146,7 @@ impl RpcHandler for NodeBroadcastHandler { type Response = Vote; fn process(&mut self, node: Self::Request) -> anyhow::Result { - self.validate(&node)?; + let node = self.validate(node)?; let votes_by_peer = self .votes_by_round_peer diff --git a/consensus/src/dag/tests/rb_handler_tests.rs b/consensus/src/dag/tests/rb_handler_tests.rs index 994f97a0474c07..22db14534218b1 100644 --- a/consensus/src/dag/tests/rb_handler_tests.rs +++ b/consensus/src/dag/tests/rb_handler_tests.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::dag::{ + dag_fetcher::TFetchRequester, dag_store::Dag, rb_handler::{NodeBroadcastHandleError, NodeBroadcastHandler}, storage::DAGStorage, @@ -17,6 +18,18 @@ use aptos_types::{ use claims::{assert_ok, assert_ok_eq}; use std::{collections::BTreeMap, sync::Arc}; +struct MockFetchRequester {} + +impl TFetchRequester for MockFetchRequester { + fn request_for_node(&self, node: crate::dag::Node) -> anyhow::Result<()> { + Ok(()) + } + + fn request_for_certified_node(&self, node: crate::dag::CertifiedNode) -> anyhow::Result<()> { + Ok(()) + } +} + #[tokio::test] async fn test_node_broadcast_receiver_succeed() { let (signers, validator_verifier) = random_validator_verifier(4, None, false); @@ -32,7 +45,13 @@ async fn test_node_broadcast_receiver_succeed() { assert_ne!(wellformed_node.digest(), equivocating_node.digest()); - let mut rb_receiver = NodeBroadcastHandler::new(dag, signers[3].clone(), epoch_state, storage); + let mut rb_receiver = NodeBroadcastHandler::new( + dag, + signers[3].clone(), + epoch_state, + storage, + Arc::new(MockFetchRequester {}), + ); let expected_result = Vote::new( wellformed_node.metadata().clone(), @@ -58,7 +77,13 @@ async fn test_node_broadcast_receiver_failure() { let storage = Arc::new(MockStorage::new()); let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone()))); - NodeBroadcastHandler::new(dag, signer.clone(), epoch_state.clone(), storage) + NodeBroadcastHandler::new( + dag, + signer.clone(), + epoch_state.clone(), + storage, + Arc::new(MockFetchRequester {}), + ) }) .collect(); @@ -130,6 +155,7 @@ fn test_node_broadcast_receiver_storage() { signers[3].clone(), epoch_state.clone(), storage.clone(), + Arc::new(MockFetchRequester {}), ); let sig = rb_receiver.process(node).expect("must succeed"); @@ -138,8 +164,13 @@ fn test_node_broadcast_receiver_storage() { sig )],); - let mut rb_receiver = - NodeBroadcastHandler::new(dag, signers[3].clone(), epoch_state, storage.clone()); + let mut rb_receiver = NodeBroadcastHandler::new( + dag, + signers[3].clone(), + epoch_state, + storage.clone(), + Arc::new(MockFetchRequester {}), + ); assert_ok!(rb_receiver.gc_before_round(2)); assert_eq!(storage.get_votes().unwrap().len(), 0); }