From 0982a4f5972068626a74a4285b3d296e8d10720c Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Wed, 5 Jul 2023 22:16:40 -0700 Subject: [PATCH] [dag] certified node handler (#8919) --- consensus/src/dag/dag_driver.rs | 7 ++- consensus/src/dag/dag_handler.rs | 21 ++++++++- consensus/src/dag/reliable_broadcast.rs | 45 +++++++++++++++++++ consensus/src/dag/tests/dag_test.rs | 18 ++------ consensus/src/dag/tests/helpers.rs | 23 ++++++++++ consensus/src/dag/tests/mod.rs | 1 + .../src/dag/tests/reliable_broadcast_tests.rs | 41 ++++++++++++++--- consensus/src/dag/types.rs | 30 +++++++++---- 8 files changed, 151 insertions(+), 35 deletions(-) create mode 100644 consensus/src/dag/tests/helpers.rs diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 90b2dc81a33cb..8433d07ca2fe8 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -95,8 +95,11 @@ impl DagDriver { let cert_ack_set = CertificateAckState::new(self.epoch_state.verifier.len()); let task = self .reliable_broadcast - .broadcast(node, signature_builder) - .then(move |certificate| rb.broadcast(certificate, cert_ack_set)); + .broadcast(node.clone(), signature_builder) + .then(move |certificate| { + let certified_node = CertifiedNode::new(node, certificate.signatures().to_owned()); + rb.broadcast(certified_node, cert_ack_set) + }); tokio::spawn(Abortable::new(task, abort_registration)); if let Some(prev_handle) = self.rb_abort_handle.replace(abort_handle) { prev_handle.abort(); diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index 06860d827e586..b7b3530d14ecf 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -1,5 +1,6 @@ // Copyright © Aptos Foundation +use super::{reliable_broadcast::CertifiedNodeHandler, types::TDAGMessage}; use crate::{ dag::{ dag_network::RpcHandler, dag_store::Dag, reliable_broadcast::NodeBroadcastHandler, @@ -20,6 +21,8 @@ use std::sync::Arc; struct NetworkHandler { dag_rpc_rx: aptos_channel::Receiver, node_receiver: NodeBroadcastHandler, + certified_node_receiver: CertifiedNodeHandler, + epoch_state: Arc, } impl NetworkHandler { @@ -31,7 +34,13 @@ impl NetworkHandler { ) -> Self { Self { dag_rpc_rx, - node_receiver: NodeBroadcastHandler::new(dag, signer, epoch_state.verifier.clone()), + node_receiver: NodeBroadcastHandler::new( + dag.clone(), + signer, + epoch_state.verifier.clone(), + ), + certified_node_receiver: CertifiedNodeHandler::new(dag), + epoch_state, } } @@ -45,8 +54,16 @@ impl NetworkHandler { async fn process_rpc(&mut self, rpc_request: IncomingDAGRequest) -> anyhow::Result<()> { let dag_message: DAGMessage = TConsensusMsg::from_network_message(rpc_request.req)?; + let response: anyhow::Result = match dag_message { - DAGMessage::NodeMsg(node) => self.node_receiver.process(node).map(|r| r.into()), + DAGMessage::NodeMsg(node) => node + .verify(&self.epoch_state.verifier) + .and_then(|_| self.node_receiver.process(node)) + .map(|r| r.into()), + DAGMessage::CertifiedNodeMsg(node) => node + .verify(&self.epoch_state.verifier) + .and_then(|_| self.certified_node_receiver.process(node)) + .map(|r| r.into()), _ => { error!("unknown rpc message {:?}", dag_message); Err(anyhow::anyhow!("unknown rpc message")) diff --git a/consensus/src/dag/reliable_broadcast.rs b/consensus/src/dag/reliable_broadcast.rs index 0518df03fe1d6..f24d5b59f2a04 100644 --- a/consensus/src/dag/reliable_broadcast.rs +++ b/consensus/src/dag/reliable_broadcast.rs @@ -1,6 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use super::types::{CertifiedAck, CertifiedNode}; use crate::{ dag::{ dag_network::{DAGNetworkSender, RpcHandler}, @@ -118,6 +119,7 @@ impl NodeBroadcastHandler { fn validate(&self, node: &Node) -> anyhow::Result<()> { let current_round = node.metadata().round(); + // round 0 is a special case and does not require any parents if current_round == 0 { return Ok(()); @@ -184,3 +186,46 @@ impl RpcHandler for NodeBroadcastHandler { } } } + +#[derive(Debug, ThisError)] +pub enum CertifiedNodeHandleError { + #[error("node already exists")] + NodeExists, + #[error("missing parents")] + MissingParents, +} + +pub struct CertifiedNodeHandler { + dag: Arc>, +} + +impl CertifiedNodeHandler { + pub fn new(dag: Arc>) -> Self { + Self { dag } + } +} + +impl RpcHandler for CertifiedNodeHandler { + type Request = CertifiedNode; + type Response = CertifiedAck; + + fn process(&mut self, node: Self::Request) -> anyhow::Result { + { + let dag_reader = self.dag.read(); + if dag_reader.exists(&node.digest()) { + return Ok(CertifiedAck::new(node.metadata().epoch())); + } + + if !dag_reader.all_exists(node.parents()) { + // TODO(ibalajiarun): implement fetching logic. + bail!(CertifiedNodeHandleError::MissingParents); + } + } + + let mut dag_writer = self.dag.write(); + let epoch = node.metadata().epoch(); + dag_writer.add_node(node)?; + + Ok(CertifiedAck::new(epoch)) + } +} diff --git a/consensus/src/dag/tests/dag_test.rs b/consensus/src/dag/tests/dag_test.rs index 7f84f3be4aa4a..19f5ee6f2296a 100644 --- a/consensus/src/dag/tests/dag_test.rs +++ b/consensus/src/dag/tests/dag_test.rs @@ -4,15 +4,12 @@ use crate::dag::{ dag_store::Dag, storage::DAGStorage, - types::{CertifiedNode, Node, NodeCertificate}, + tests::helpers::new_certified_node, + types::{CertifiedNode, Node}, }; -use aptos_consensus_types::common::{Author, Payload, Round}; use aptos_crypto::HashValue; use aptos_infallible::Mutex; -use aptos_types::{ - aggregate_signature::AggregateSignature, epoch_state::EpochState, - validator_verifier::random_validator_verifier, -}; +use aptos_types::{epoch_state::EpochState, validator_verifier::random_validator_verifier}; use std::{collections::HashMap, sync::Arc}; pub struct MockStorage { @@ -170,12 +167,3 @@ fn test_dag_recover_from_storage() { let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone()); assert!(storage.certified_node_data.lock().is_empty()); } - -fn new_certified_node( - round: Round, - author: Author, - parents: Vec, -) -> CertifiedNode { - let node = Node::new(1, round, author, 0, Payload::empty(false), parents); - CertifiedNode::new(node, AggregateSignature::empty()) -} diff --git a/consensus/src/dag/tests/helpers.rs b/consensus/src/dag/tests/helpers.rs new file mode 100644 index 0000000000000..b6eb7a1c9bad8 --- /dev/null +++ b/consensus/src/dag/tests/helpers.rs @@ -0,0 +1,23 @@ +// Copyright © Aptos Foundation + +use crate::dag::types::{CertifiedNode, Node, NodeCertificate}; +use aptos_consensus_types::common::{Author, Payload, Round}; +use aptos_types::aggregate_signature::AggregateSignature; + +pub(crate) fn new_certified_node( + round: Round, + author: Author, + parents: Vec, +) -> CertifiedNode { + let node = Node::new(1, round, author, 0, Payload::empty(false), parents); + CertifiedNode::new(node, AggregateSignature::empty()) +} + +pub(crate) fn new_node( + round: Round, + timestamp: u64, + author: Author, + parents: Vec, +) -> Node { + Node::new(0, round, author, timestamp, Payload::empty(false), parents) +} diff --git a/consensus/src/dag/tests/mod.rs b/consensus/src/dag/tests/mod.rs index c5254238605a4..ce2188c8c723e 100644 --- a/consensus/src/dag/tests/mod.rs +++ b/consensus/src/dag/tests/mod.rs @@ -2,4 +2,5 @@ // SPDX-License-Identifier: Apache-2.0 mod dag_test; +mod helpers; mod reliable_broadcast_tests; diff --git a/consensus/src/dag/tests/reliable_broadcast_tests.rs b/consensus/src/dag/tests/reliable_broadcast_tests.rs index 4d374bdda51c2..12509ca76cb06 100644 --- a/consensus/src/dag/tests/reliable_broadcast_tests.rs +++ b/consensus/src/dag/tests/reliable_broadcast_tests.rs @@ -6,24 +6,28 @@ use crate::{ dag_network::DAGNetworkSender, dag_store::Dag, reliable_broadcast::{ - BroadcastStatus, NodeBroadcastHandleError, NodeBroadcastHandler, ReliableBroadcast, + BroadcastStatus, CertifiedNodeHandleError, CertifiedNodeHandler, + NodeBroadcastHandleError, NodeBroadcastHandler, ReliableBroadcast, }, - tests::dag_test::MockStorage, - types::{DAGMessage, Node, NodeCertificate, NodeDigestSignature, TestAck, TestMessage}, + tests::{ + dag_test::MockStorage, + helpers::{new_certified_node, new_node}, + }, + types::{DAGMessage, NodeCertificate, NodeDigestSignature, TestAck, TestMessage}, RpcHandler, }, network::TConsensusMsg, network_interface::ConsensusMsg, }; use anyhow::bail; -use aptos_consensus_types::common::{Author, Payload, Round}; +use aptos_consensus_types::common::Author; use aptos_infallible::{Mutex, RwLock}; use aptos_types::{ aggregate_signature::PartialSignatures, epoch_state::EpochState, validator_verifier::random_validator_verifier, }; use async_trait::async_trait; -use claims::assert_ok_eq; +use claims::{assert_ok, assert_ok_eq}; use futures::{ future::{AbortHandle, Abortable}, FutureExt, @@ -278,6 +282,29 @@ async fn test_node_broadcast_receiver_failure() { ); } -fn new_node(round: Round, timestamp: u64, author: Author, parents: Vec) -> Node { - Node::new(0, round, author, timestamp, Payload::empty(false), parents) +#[test] +fn test_certified_node_receiver() { + let (signers, validator_verifier) = random_validator_verifier(4, None, false); + let epoch_state = Arc::new(EpochState { + epoch: 1, + verifier: validator_verifier, + }); + let storage = Arc::new(MockStorage::new()); + let dag = Arc::new(RwLock::new(Dag::new(epoch_state, storage))); + + let node = new_certified_node(0, signers[0].author(), vec![]); + + let mut rb_receiver = CertifiedNodeHandler::new(dag); + + // expect an ack for a valid message + assert_ok!(rb_receiver.process(node.clone())); + // expect an ack again if the same message is sent again + assert_ok!(rb_receiver.process(node)); + + let parent_node = new_certified_node(0, signers[1].author(), vec![]); + let invalid_node = new_certified_node(1, signers[0].author(), vec![parent_node.certificate()]); + assert_eq!( + rb_receiver.process(invalid_node).unwrap_err().to_string(), + CertifiedNodeHandleError::MissingParents.to_string() + ); } diff --git a/consensus/src/dag/types.rs b/consensus/src/dag/types.rs index 1ff8ba46af1f4..6627e3c408a57 100644 --- a/consensus/src/dag/types.rs +++ b/consensus/src/dag/types.rs @@ -33,11 +33,7 @@ impl TDAGMessage for NodeDigestSignature { todo!() } } -impl TDAGMessage for NodeCertificate { - fn verify(&self, _verifier: &ValidatorVerifier) -> anyhow::Result<()> { - todo!() - } -} + impl TDAGMessage for CertifiedAck { fn verify(&self, _verifier: &ValidatorVerifier) -> anyhow::Result<()> { todo!() @@ -283,6 +279,16 @@ impl Deref for CertifiedNode { } } +impl TDAGMessage for CertifiedNode { + fn verify(&self, verifier: &ValidatorVerifier) -> anyhow::Result<()> { + let node_digest = NodeDigest::new(self.digest()); + + verifier + .verify_multi_signatures(&node_digest, self.certificate().signatures()) + .map_err(|e| anyhow::anyhow!("unable to verify: {}", e)) + } +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct NodeDigestSignature { epoch: u64, @@ -363,10 +369,16 @@ pub struct CertifiedAck { epoch: u64, } +impl CertifiedAck { + pub fn new(epoch: u64) -> Self { + Self { epoch } + } +} + impl BroadcastStatus for CertificateAckState { type Ack = CertifiedAck; type Aggregated = (); - type Message = NodeCertificate; + type Message = CertifiedNode; fn add(&mut self, peer: Author, _ack: Self::Ack) -> anyhow::Result> { self.received.insert(peer); @@ -431,7 +443,7 @@ pub struct DAGNetworkMessage { pub enum DAGMessage { NodeMsg(Node), NodeDigestSignatureMsg(NodeDigestSignature), - NodeCertificateMsg(NodeCertificate), + CertifiedNodeMsg(CertifiedNode), CertifiedAckMsg(CertifiedAck), FetchRequest(RemoteFetchRequest), FetchResponse(FetchResponse), @@ -447,7 +459,7 @@ impl DAGMessage { match self { DAGMessage::NodeMsg(_) => "NodeMsg", DAGMessage::NodeDigestSignatureMsg(_) => "NodeDigestSignatureMsg", - DAGMessage::NodeCertificateMsg(_) => "NodeCertificateMsg", + DAGMessage::CertifiedNodeMsg(_) => "CertifiedNodeMsg", DAGMessage::CertifiedAckMsg(_) => "CertifiedAckMsg", DAGMessage::FetchRequest(_) => "FetchRequest", DAGMessage::FetchResponse(_) => "FetchResponse", @@ -464,7 +476,7 @@ impl TConsensusMsg for DAGMessage { match self { DAGMessage::NodeMsg(node) => node.metadata.epoch, DAGMessage::NodeDigestSignatureMsg(signature) => signature.epoch, - DAGMessage::NodeCertificateMsg(certificate) => certificate.metadata.epoch, + DAGMessage::CertifiedNodeMsg(node) => node.metadata.epoch, DAGMessage::CertifiedAckMsg(ack) => ack.epoch, DAGMessage::FetchRequest(req) => req.target.epoch, DAGMessage::FetchResponse(res) => res.epoch,