Skip to content

Commit

Permalink
[dag] certified node handler (#8919)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun authored Jul 6, 2023
1 parent da278bf commit 0982a4f
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 35 deletions.
7 changes: 5 additions & 2 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,11 @@ impl DagDriver {
let cert_ack_set = CertificateAckState::new(self.epoch_state.verifier.len());
let task = self
.reliable_broadcast
.broadcast(node, signature_builder)
.then(move |certificate| rb.broadcast(certificate, cert_ack_set));
.broadcast(node.clone(), signature_builder)
.then(move |certificate| {
let certified_node = CertifiedNode::new(node, certificate.signatures().to_owned());
rb.broadcast(certified_node, cert_ack_set)
});
tokio::spawn(Abortable::new(task, abort_registration));
if let Some(prev_handle) = self.rb_abort_handle.replace(abort_handle) {
prev_handle.abort();
Expand Down
21 changes: 19 additions & 2 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright © Aptos Foundation

use super::{reliable_broadcast::CertifiedNodeHandler, types::TDAGMessage};
use crate::{
dag::{
dag_network::RpcHandler, dag_store::Dag, reliable_broadcast::NodeBroadcastHandler,
Expand All @@ -20,6 +21,8 @@ use std::sync::Arc;
struct NetworkHandler {
dag_rpc_rx: aptos_channel::Receiver<Author, IncomingDAGRequest>,
node_receiver: NodeBroadcastHandler,
certified_node_receiver: CertifiedNodeHandler,
epoch_state: Arc<EpochState>,
}

impl NetworkHandler {
Expand All @@ -31,7 +34,13 @@ impl NetworkHandler {
) -> Self {
Self {
dag_rpc_rx,
node_receiver: NodeBroadcastHandler::new(dag, signer, epoch_state.verifier.clone()),
node_receiver: NodeBroadcastHandler::new(
dag.clone(),
signer,
epoch_state.verifier.clone(),
),
certified_node_receiver: CertifiedNodeHandler::new(dag),
epoch_state,
}
}

Expand All @@ -45,8 +54,16 @@ impl NetworkHandler {

async fn process_rpc(&mut self, rpc_request: IncomingDAGRequest) -> anyhow::Result<()> {
let dag_message: DAGMessage = TConsensusMsg::from_network_message(rpc_request.req)?;

let response: anyhow::Result<DAGMessage> = match dag_message {
DAGMessage::NodeMsg(node) => self.node_receiver.process(node).map(|r| r.into()),
DAGMessage::NodeMsg(node) => node
.verify(&self.epoch_state.verifier)
.and_then(|_| self.node_receiver.process(node))
.map(|r| r.into()),
DAGMessage::CertifiedNodeMsg(node) => node
.verify(&self.epoch_state.verifier)
.and_then(|_| self.certified_node_receiver.process(node))
.map(|r| r.into()),
_ => {
error!("unknown rpc message {:?}", dag_message);
Err(anyhow::anyhow!("unknown rpc message"))
Expand Down
45 changes: 45 additions & 0 deletions consensus/src/dag/reliable_broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::types::{CertifiedAck, CertifiedNode};
use crate::{
dag::{
dag_network::{DAGNetworkSender, RpcHandler},
Expand Down Expand Up @@ -118,6 +119,7 @@ impl NodeBroadcastHandler {

fn validate(&self, node: &Node) -> anyhow::Result<()> {
let current_round = node.metadata().round();

// round 0 is a special case and does not require any parents
if current_round == 0 {
return Ok(());
Expand Down Expand Up @@ -184,3 +186,46 @@ impl RpcHandler for NodeBroadcastHandler {
}
}
}

#[derive(Debug, ThisError)]
pub enum CertifiedNodeHandleError {
#[error("node already exists")]
NodeExists,
#[error("missing parents")]
MissingParents,
}

pub struct CertifiedNodeHandler {
dag: Arc<RwLock<Dag>>,
}

impl CertifiedNodeHandler {
pub fn new(dag: Arc<RwLock<Dag>>) -> Self {
Self { dag }
}
}

impl RpcHandler for CertifiedNodeHandler {
type Request = CertifiedNode;
type Response = CertifiedAck;

fn process(&mut self, node: Self::Request) -> anyhow::Result<Self::Response> {
{
let dag_reader = self.dag.read();
if dag_reader.exists(&node.digest()) {
return Ok(CertifiedAck::new(node.metadata().epoch()));
}

if !dag_reader.all_exists(node.parents()) {
// TODO(ibalajiarun): implement fetching logic.
bail!(CertifiedNodeHandleError::MissingParents);
}
}

let mut dag_writer = self.dag.write();
let epoch = node.metadata().epoch();
dag_writer.add_node(node)?;

Ok(CertifiedAck::new(epoch))
}
}
18 changes: 3 additions & 15 deletions consensus/src/dag/tests/dag_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@
use crate::dag::{
dag_store::Dag,
storage::DAGStorage,
types::{CertifiedNode, Node, NodeCertificate},
tests::helpers::new_certified_node,
types::{CertifiedNode, Node},
};
use aptos_consensus_types::common::{Author, Payload, Round};
use aptos_crypto::HashValue;
use aptos_infallible::Mutex;
use aptos_types::{
aggregate_signature::AggregateSignature, epoch_state::EpochState,
validator_verifier::random_validator_verifier,
};
use aptos_types::{epoch_state::EpochState, validator_verifier::random_validator_verifier};
use std::{collections::HashMap, sync::Arc};

pub struct MockStorage {
Expand Down Expand Up @@ -170,12 +167,3 @@ fn test_dag_recover_from_storage() {
let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone());
assert!(storage.certified_node_data.lock().is_empty());
}

fn new_certified_node(
round: Round,
author: Author,
parents: Vec<NodeCertificate>,
) -> CertifiedNode {
let node = Node::new(1, round, author, 0, Payload::empty(false), parents);
CertifiedNode::new(node, AggregateSignature::empty())
}
23 changes: 23 additions & 0 deletions consensus/src/dag/tests/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright © Aptos Foundation

use crate::dag::types::{CertifiedNode, Node, NodeCertificate};
use aptos_consensus_types::common::{Author, Payload, Round};
use aptos_types::aggregate_signature::AggregateSignature;

pub(crate) fn new_certified_node(
round: Round,
author: Author,
parents: Vec<NodeCertificate>,
) -> CertifiedNode {
let node = Node::new(1, round, author, 0, Payload::empty(false), parents);
CertifiedNode::new(node, AggregateSignature::empty())
}

pub(crate) fn new_node(
round: Round,
timestamp: u64,
author: Author,
parents: Vec<NodeCertificate>,
) -> Node {
Node::new(0, round, author, timestamp, Payload::empty(false), parents)
}
1 change: 1 addition & 0 deletions consensus/src/dag/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
// SPDX-License-Identifier: Apache-2.0

mod dag_test;
mod helpers;
mod reliable_broadcast_tests;
41 changes: 34 additions & 7 deletions consensus/src/dag/tests/reliable_broadcast_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,28 @@ use crate::{
dag_network::DAGNetworkSender,
dag_store::Dag,
reliable_broadcast::{
BroadcastStatus, NodeBroadcastHandleError, NodeBroadcastHandler, ReliableBroadcast,
BroadcastStatus, CertifiedNodeHandleError, CertifiedNodeHandler,
NodeBroadcastHandleError, NodeBroadcastHandler, ReliableBroadcast,
},
tests::dag_test::MockStorage,
types::{DAGMessage, Node, NodeCertificate, NodeDigestSignature, TestAck, TestMessage},
tests::{
dag_test::MockStorage,
helpers::{new_certified_node, new_node},
},
types::{DAGMessage, NodeCertificate, NodeDigestSignature, TestAck, TestMessage},
RpcHandler,
},
network::TConsensusMsg,
network_interface::ConsensusMsg,
};
use anyhow::bail;
use aptos_consensus_types::common::{Author, Payload, Round};
use aptos_consensus_types::common::Author;
use aptos_infallible::{Mutex, RwLock};
use aptos_types::{
aggregate_signature::PartialSignatures, epoch_state::EpochState,
validator_verifier::random_validator_verifier,
};
use async_trait::async_trait;
use claims::assert_ok_eq;
use claims::{assert_ok, assert_ok_eq};
use futures::{
future::{AbortHandle, Abortable},
FutureExt,
Expand Down Expand Up @@ -278,6 +282,29 @@ async fn test_node_broadcast_receiver_failure() {
);
}

fn new_node(round: Round, timestamp: u64, author: Author, parents: Vec<NodeCertificate>) -> Node {
Node::new(0, round, author, timestamp, Payload::empty(false), parents)
#[test]
fn test_certified_node_receiver() {
let (signers, validator_verifier) = random_validator_verifier(4, None, false);
let epoch_state = Arc::new(EpochState {
epoch: 1,
verifier: validator_verifier,
});
let storage = Arc::new(MockStorage::new());
let dag = Arc::new(RwLock::new(Dag::new(epoch_state, storage)));

let node = new_certified_node(0, signers[0].author(), vec![]);

let mut rb_receiver = CertifiedNodeHandler::new(dag);

// expect an ack for a valid message
assert_ok!(rb_receiver.process(node.clone()));
// expect an ack again if the same message is sent again
assert_ok!(rb_receiver.process(node));

let parent_node = new_certified_node(0, signers[1].author(), vec![]);
let invalid_node = new_certified_node(1, signers[0].author(), vec![parent_node.certificate()]);
assert_eq!(
rb_receiver.process(invalid_node).unwrap_err().to_string(),
CertifiedNodeHandleError::MissingParents.to_string()
);
}
30 changes: 21 additions & 9 deletions consensus/src/dag/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ impl TDAGMessage for NodeDigestSignature {
todo!()
}
}
impl TDAGMessage for NodeCertificate {
fn verify(&self, _verifier: &ValidatorVerifier) -> anyhow::Result<()> {
todo!()
}
}

impl TDAGMessage for CertifiedAck {
fn verify(&self, _verifier: &ValidatorVerifier) -> anyhow::Result<()> {
todo!()
Expand Down Expand Up @@ -283,6 +279,16 @@ impl Deref for CertifiedNode {
}
}

impl TDAGMessage for CertifiedNode {
fn verify(&self, verifier: &ValidatorVerifier) -> anyhow::Result<()> {
let node_digest = NodeDigest::new(self.digest());

verifier
.verify_multi_signatures(&node_digest, self.certificate().signatures())
.map_err(|e| anyhow::anyhow!("unable to verify: {}", e))
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct NodeDigestSignature {
epoch: u64,
Expand Down Expand Up @@ -363,10 +369,16 @@ pub struct CertifiedAck {
epoch: u64,
}

impl CertifiedAck {
pub fn new(epoch: u64) -> Self {
Self { epoch }
}
}

impl BroadcastStatus for CertificateAckState {
type Ack = CertifiedAck;
type Aggregated = ();
type Message = NodeCertificate;
type Message = CertifiedNode;

fn add(&mut self, peer: Author, _ack: Self::Ack) -> anyhow::Result<Option<Self::Aggregated>> {
self.received.insert(peer);
Expand Down Expand Up @@ -431,7 +443,7 @@ pub struct DAGNetworkMessage {
pub enum DAGMessage {
NodeMsg(Node),
NodeDigestSignatureMsg(NodeDigestSignature),
NodeCertificateMsg(NodeCertificate),
CertifiedNodeMsg(CertifiedNode),
CertifiedAckMsg(CertifiedAck),
FetchRequest(RemoteFetchRequest),
FetchResponse(FetchResponse),
Expand All @@ -447,7 +459,7 @@ impl DAGMessage {
match self {
DAGMessage::NodeMsg(_) => "NodeMsg",
DAGMessage::NodeDigestSignatureMsg(_) => "NodeDigestSignatureMsg",
DAGMessage::NodeCertificateMsg(_) => "NodeCertificateMsg",
DAGMessage::CertifiedNodeMsg(_) => "CertifiedNodeMsg",
DAGMessage::CertifiedAckMsg(_) => "CertifiedAckMsg",
DAGMessage::FetchRequest(_) => "FetchRequest",
DAGMessage::FetchResponse(_) => "FetchResponse",
Expand All @@ -464,7 +476,7 @@ impl TConsensusMsg for DAGMessage {
match self {
DAGMessage::NodeMsg(node) => node.metadata.epoch,
DAGMessage::NodeDigestSignatureMsg(signature) => signature.epoch,
DAGMessage::NodeCertificateMsg(certificate) => certificate.metadata.epoch,
DAGMessage::CertifiedNodeMsg(node) => node.metadata.epoch,
DAGMessage::CertifiedAckMsg(ack) => ack.epoch,
DAGMessage::FetchRequest(req) => req.target.epoch,
DAGMessage::FetchResponse(res) => res.epoch,
Expand Down

0 comments on commit 0982a4f

Please sign in to comment.