Skip to content

Commit

Permalink
[dag] basic dag fetcher
Browse files Browse the repository at this point in the history
Add skeleton for dag fetcher that sends a bitmask of local dag and target node to remote and receives missing nodes back.
  • Loading branch information
zekun000 committed Jun 26, 2023
1 parent 926a849 commit 5680f2d
Showing 10 changed files with 199 additions and 30 deletions.
2 changes: 1 addition & 1 deletion consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
@@ -310,7 +310,7 @@ impl ProofOfStore {
pub fn shuffled_signers(&self, validator: &ValidatorVerifier) -> Vec<PeerId> {
let mut ret: Vec<PeerId> = self
.multi_signature
.get_voter_addresses(&validator.get_ordered_account_addresses());
.get_signers_addresses(&validator.get_ordered_account_addresses());
ret.shuffle(&mut thread_rng());
ret
}
4 changes: 2 additions & 2 deletions consensus/consensus-types/src/timeout_2chain.rs
Original file line number Diff line number Diff line change
@@ -368,15 +368,15 @@ impl AggregateSignatureWithRounds {
&self,
ordered_validator_addresses: &[AccountAddress],
) -> Vec<AccountAddress> {
self.sig.get_voter_addresses(ordered_validator_addresses)
self.sig.get_signers_addresses(ordered_validator_addresses)
}

pub fn get_voters_and_rounds(
&self,
ordered_validator_addresses: &[AccountAddress],
) -> Vec<(AccountAddress, Round)> {
self.sig
.get_voter_addresses(ordered_validator_addresses)
.get_signers_addresses(ordered_validator_addresses)
.into_iter()
.zip(self.rounds.clone())
.collect()
134 changes: 134 additions & 0 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::dag::{
dag_store::Dag,
types::{CertifiedNode, DAGMessage, DAGNetworkSender, Node, NodeMetadata},
};
use aptos_consensus_types::common::{Author, Round};
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_types::{epoch_state::EpochState, validator_verifier::ValidatorVerifier};
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use tokio::sync::{
mpsc::{Receiver, Sender},
oneshot,
};

/// Represents a request to fetch missing dependencies for `target`, `start_round` represents
/// the first round we care about in the DAG, `exists_bitmask` is a two dimensional bitmask represents
/// if a node exist at [start_round + index][validator_index].
#[derive(Serialize, Deserialize, Clone)]
struct FetchRequest {
target: NodeMetadata,
start_round: Round,
exists_bitmask: Vec<Vec<bool>>,
}

/// Represents a response to FetchRequest, `certified_nodes` are indexed by [round][validator_index]
/// It should fill in gaps from the `exists_bitmask` according to the parents from the `target_digest` node.
#[derive(Serialize, Deserialize, Clone)]
struct FetchResponse {
epoch: u64,
certifies_nodes: Vec<Vec<CertifiedNode>>,
}

impl FetchResponse {
pub fn verify(
self,
_request: &FetchRequest,
_validator_verifier: &ValidatorVerifier,
) -> anyhow::Result<Self> {
todo!("verification");
}
}

impl DAGMessage for FetchRequest {
fn epoch(&self) -> u64 {
self.target.epoch()
}
}

impl DAGMessage for FetchResponse {
fn epoch(&self) -> u64 {
self.epoch
}
}

enum FetchCallback {
Node(Node, oneshot::Sender<Node>),
CertifiedNode(CertifiedNode, oneshot::Sender<CertifiedNode>),
}

impl FetchCallback {
pub fn responders(&self, validators: &[Author]) -> Vec<Author> {
match self {
FetchCallback::Node(node, _) => vec![*node.author()],
FetchCallback::CertifiedNode(node, _) => node.certificate().signers(validators),
}
}

pub fn notify(self) {
if match self {
FetchCallback::Node(node, sender) => sender.send(node).map_err(|_| ()),
FetchCallback::CertifiedNode(node, sender) => sender.send(node).map_err(|_| ()),
}
.is_err()
{
error!("Failed to send node back");
}
}
}

struct DagFetcher {
epoch_state: Arc<EpochState>,
network: Arc<dyn DAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
request_rx: Receiver<(FetchRequest, FetchCallback)>,
}

impl DagFetcher {
pub fn new(
epoch_state: Arc<EpochState>,
network: Arc<dyn DAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
) -> (Self, Sender<(FetchRequest, FetchCallback)>) {
let (request_tx, request_rx) = tokio::sync::mpsc::channel(16);
(
Self {
epoch_state,
network,
dag,
request_rx,
},
request_tx,
)
}

pub async fn start(mut self) {
while let Some((request, callback)) = self.request_rx.recv().await {
let responders =
callback.responders(&self.epoch_state.verifier.get_ordered_account_addresses());
let network_request = request.clone().into_network_message();
if let Ok(response) = self
.network
.send_rpc_with_fallbacks(responders, network_request, Duration::from_secs(1))
.await
.and_then(FetchResponse::from_network_message)
.and_then(|response| response.verify(&request, &self.epoch_state.verifier))
{
// TODO: support chunk response or fallback to state sync
let mut dag_writer = self.dag.write();
for rounds in response.certifies_nodes {
for node in rounds {
if let Err(e) = dag_writer.add_node(node) {
error!("Failed to add node {}", e);
}
}
}
callback.notify();
}
}
}
}
1 change: 1 addition & 0 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@
#![allow(dead_code)]

mod dag_driver;
mod dag_fetcher;
mod dag_store;
mod reliable_broadcast;
#[cfg(test)]
13 changes: 1 addition & 12 deletions consensus/src/dag/reliable_broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{dag::types::DAGMessage, network_interface::ConsensusMsg};
use crate::dag::types::{DAGMessage, DAGNetworkSender};
use aptos_consensus_types::common::Author;
use async_trait::async_trait;
use futures::{stream::FuturesUnordered, StreamExt};
use std::{future::Future, sync::Arc, time::Duration};

@@ -15,16 +14,6 @@ pub trait BroadcastStatus {
fn add(&mut self, peer: Author, ack: Self::Ack) -> anyhow::Result<Option<Self::Aggregated>>;
}

#[async_trait]
pub trait DAGNetworkSender: Send + Sync {
async fn send_rpc(
&self,
receiver: Author,
message: ConsensusMsg,
timeout: Duration,
) -> anyhow::Result<ConsensusMsg>;
}

pub struct ReliableBroadcast {
validators: Vec<Author>,
network_sender: Arc<dyn DAGNetworkSender>,
13 changes: 11 additions & 2 deletions consensus/src/dag/tests/reliable_broadcast_tests.rs
Original file line number Diff line number Diff line change
@@ -3,8 +3,8 @@

use crate::{
dag::{
reliable_broadcast::{BroadcastStatus, DAGNetworkSender, ReliableBroadcast},
types::DAGMessage,
reliable_broadcast::{BroadcastStatus, ReliableBroadcast},
types::{DAGMessage, DAGNetworkSender},
},
network_interface::ConsensusMsg,
};
@@ -101,6 +101,15 @@ impl DAGNetworkSender for TestDAGSender {
.insert(receiver, TestMessage::from_network_message(message)?);
Ok(TestAck.into_network_message())
}

async fn send_rpc_with_fallbacks(
&self,
_responders: Vec<Author>,
_message: ConsensusMsg,
_timeout: Duration,
) -> anyhow::Result<ConsensusMsg> {
unimplemented!();
}
}

#[tokio::test]
40 changes: 38 additions & 2 deletions consensus/src/dag/types.rs
Original file line number Diff line number Diff line change
@@ -14,8 +14,9 @@ use aptos_types::{
aggregate_signature::{AggregateSignature, PartialSignatures},
epoch_state::EpochState,
};
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{collections::HashSet, ops::Deref, sync::Arc};
use std::{collections::HashSet, ops::Deref, sync::Arc, time::Duration};

pub trait DAGMessage: Sized + Clone + Serialize + DeserializeOwned {
fn epoch(&self) -> u64;
@@ -35,6 +36,25 @@ pub trait DAGMessage: Sized + Clone + Serialize + DeserializeOwned {
}
}

#[async_trait]
pub trait DAGNetworkSender: Send + Sync {
async fn send_rpc(
&self,
receiver: Author,
message: ConsensusMsg,
timeout: Duration,
) -> anyhow::Result<ConsensusMsg>;

/// Given a list of potential responders, sending rpc to get response from any of them and could
/// fallback to more in case of failures.
async fn send_rpc_with_fallbacks(
&self,
responders: Vec<Author>,
message: ConsensusMsg,
timeout: Duration,
) -> anyhow::Result<ConsensusMsg>;
}

/// Represents the metadata about the node, without payload and parents from Node
#[derive(Clone, Serialize, Deserialize)]
pub struct NodeMetadata {
@@ -57,6 +77,10 @@ impl NodeMetadata {
pub fn author(&self) -> &Author {
&self.author
}

pub fn epoch(&self) -> u64 {
self.epoch
}
}

/// Node representation in the DAG, parents contain 2f+1 strong links (links to previous round)
@@ -143,6 +167,10 @@ impl Node {
pub fn parents(&self) -> &[NodeMetadata] {
&self.parents
}

pub fn author(&self) -> &Author {
self.metadata.author()
}
}

/// Quorum signatures over the node digest
@@ -161,9 +189,13 @@ impl NodeCertificate {
signatures,
}
}

pub fn signers(&self, validators: &[Author]) -> Vec<Author> {
self.signatures.get_signers_addresses(validators)
}
}

#[derive(Clone)]
#[derive(Serialize, Deserialize, Clone)]
pub struct CertifiedNode {
node: Node,
certificate: NodeCertificate,
@@ -173,6 +205,10 @@ impl CertifiedNode {
pub fn new(node: Node, certificate: NodeCertificate) -> Self {
Self { node, certificate }
}

pub fn certificate(&self) -> &NodeCertificate {
&self.certificate
}
}

impl Deref for CertifiedNode {
4 changes: 2 additions & 2 deletions types/src/aggregate_signature.rs
Original file line number Diff line number Diff line change
@@ -36,11 +36,11 @@ impl AggregateSignature {
}
}

pub fn get_voters_bitvec(&self) -> &BitVec {
pub fn get_signers_bitvec(&self) -> &BitVec {
&self.validator_bitmask
}

pub fn get_voter_addresses(
pub fn get_signers_addresses(
&self,
validator_addresses: &[AccountAddress],
) -> Vec<AccountAddress> {
4 changes: 2 additions & 2 deletions types/src/ledger_info.rs
Original file line number Diff line number Diff line change
@@ -258,15 +258,15 @@ impl LedgerInfoWithV0 {
}

pub fn get_voters(&self, validator_addresses: &[AccountAddress]) -> Vec<AccountAddress> {
self.signatures.get_voter_addresses(validator_addresses)
self.signatures.get_signers_addresses(validator_addresses)
}

pub fn get_num_voters(&self) -> usize {
self.signatures.get_num_voters()
}

pub fn get_voters_bitvec(&self) -> &BitVec {
self.signatures.get_voters_bitvec()
self.signatures.get_signers_bitvec()
}

pub fn verify_signatures(
14 changes: 7 additions & 7 deletions types/src/validator_verifier.rs
Original file line number Diff line number Diff line change
@@ -231,10 +231,10 @@ impl ValidatorVerifier {
multi_signature: &AggregateSignature,
) -> std::result::Result<(), VerifyError> {
// Verify the number of signature is not greater than expected.
Self::check_num_of_voters(self.len() as u16, multi_signature.get_voters_bitvec())?;
Self::check_num_of_voters(self.len() as u16, multi_signature.get_signers_bitvec())?;
let mut pub_keys = vec![];
let mut authors = vec![];
for index in multi_signature.get_voters_bitvec().iter_ones() {
for index in multi_signature.get_signers_bitvec().iter_ones() {
let validator = self
.validator_infos
.get(index)
@@ -274,10 +274,10 @@ impl ValidatorVerifier {
aggregated_signature: &AggregateSignature,
) -> std::result::Result<(), VerifyError> {
// Verify the number of signature is not greater than expected.
Self::check_num_of_voters(self.len() as u16, aggregated_signature.get_voters_bitvec())?;
Self::check_num_of_voters(self.len() as u16, aggregated_signature.get_signers_bitvec())?;
let mut pub_keys = vec![];
let mut authors = vec![];
for index in aggregated_signature.get_voters_bitvec().iter_ones() {
for index in aggregated_signature.get_signers_bitvec().iter_ones() {
let validator = self
.validator_infos
.get(index)
@@ -679,7 +679,7 @@ mod tests {
.aggregate_signatures(&partial_signature)
.unwrap();
assert_eq!(
aggregated_signature.get_voters_bitvec().num_buckets(),
aggregated_signature.get_signers_bitvec().num_buckets(),
BitVec::required_buckets(validator_verifier.validator_infos.len() as u16)
);
// Check against signatures == N; this will pass.
@@ -709,7 +709,7 @@ mod tests {
.aggregate_signatures(&partial_signature)
.unwrap();
assert_eq!(
aggregated_signature.get_voters_bitvec().num_buckets(),
aggregated_signature.get_signers_bitvec().num_buckets(),
BitVec::required_buckets(validator_verifier.validator_infos.len() as u16)
);
assert_eq!(
@@ -737,7 +737,7 @@ mod tests {
.aggregate_signatures(&partial_signature)
.unwrap();
assert_eq!(
aggregated_signature.get_voters_bitvec().num_buckets(),
aggregated_signature.get_signers_bitvec().num_buckets(),
BitVec::required_buckets(validator_verifier.validator_infos.len() as u16)
);
assert_eq!(

0 comments on commit 5680f2d

Please sign in to comment.