From 5680f2d47c2cd99cf1983fb64458df3e04246945 Mon Sep 17 00:00:00 2001
From: Zekun Li
Date: Thu, 22 Jun 2023 18:36:56 -0700
Subject: [PATCH] [dag] basic dag fetcher
Add skeleton for dag fetcher that sends a bitmask of local dag and target node to remote and receives missing nodes back.
---
.../consensus-types/src/proof_of_store.rs | 2 +-
.../consensus-types/src/timeout_2chain.rs | 4 +-
consensus/src/dag/dag_fetcher.rs | 134 ++++++++++++++++++
consensus/src/dag/mod.rs | 1 +
consensus/src/dag/reliable_broadcast.rs | 13 +-
.../src/dag/tests/reliable_broadcast_tests.rs | 13 +-
consensus/src/dag/types.rs | 40 +++++-
types/src/aggregate_signature.rs | 4 +-
types/src/ledger_info.rs | 4 +-
types/src/validator_verifier.rs | 14 +-
10 files changed, 199 insertions(+), 30 deletions(-)
create mode 100644 consensus/src/dag/dag_fetcher.rs
diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs
index 3fd0972658ede..dd9f18096f333 100644
--- a/consensus/consensus-types/src/proof_of_store.rs
+++ b/consensus/consensus-types/src/proof_of_store.rs
@@ -310,7 +310,7 @@ impl ProofOfStore {
pub fn shuffled_signers(&self, validator: &ValidatorVerifier) -> Vec {
let mut ret: Vec = self
.multi_signature
- .get_voter_addresses(&validator.get_ordered_account_addresses());
+ .get_signers_addresses(&validator.get_ordered_account_addresses());
ret.shuffle(&mut thread_rng());
ret
}
diff --git a/consensus/consensus-types/src/timeout_2chain.rs b/consensus/consensus-types/src/timeout_2chain.rs
index a0bbe49fffde5..9fc0ced6f3c50 100644
--- a/consensus/consensus-types/src/timeout_2chain.rs
+++ b/consensus/consensus-types/src/timeout_2chain.rs
@@ -368,7 +368,7 @@ impl AggregateSignatureWithRounds {
&self,
ordered_validator_addresses: &[AccountAddress],
) -> Vec {
- self.sig.get_voter_addresses(ordered_validator_addresses)
+ self.sig.get_signers_addresses(ordered_validator_addresses)
}
pub fn get_voters_and_rounds(
@@ -376,7 +376,7 @@ impl AggregateSignatureWithRounds {
ordered_validator_addresses: &[AccountAddress],
) -> Vec<(AccountAddress, Round)> {
self.sig
- .get_voter_addresses(ordered_validator_addresses)
+ .get_signers_addresses(ordered_validator_addresses)
.into_iter()
.zip(self.rounds.clone())
.collect()
diff --git a/consensus/src/dag/dag_fetcher.rs b/consensus/src/dag/dag_fetcher.rs
new file mode 100644
index 0000000000000..194b4df2e7869
--- /dev/null
+++ b/consensus/src/dag/dag_fetcher.rs
@@ -0,0 +1,134 @@
+// Copyright © Aptos Foundation
+// SPDX-License-Identifier: Apache-2.0
+
+use crate::dag::{
+ dag_store::Dag,
+ types::{CertifiedNode, DAGMessage, DAGNetworkSender, Node, NodeMetadata},
+};
+use aptos_consensus_types::common::{Author, Round};
+use aptos_infallible::RwLock;
+use aptos_logger::error;
+use aptos_types::{epoch_state::EpochState, validator_verifier::ValidatorVerifier};
+use serde::{Deserialize, Serialize};
+use std::{sync::Arc, time::Duration};
+use tokio::sync::{
+ mpsc::{Receiver, Sender},
+ oneshot,
+};
+
+/// Represents a request to fetch missing dependencies for `target`, `start_round` represents
+/// the first round we care about in the DAG, `exists_bitmask` is a two dimensional bitmask represents
+/// if a node exist at [start_round + index][validator_index].
+#[derive(Serialize, Deserialize, Clone)]
+struct FetchRequest {
+ target: NodeMetadata,
+ start_round: Round,
+ exists_bitmask: Vec>,
+}
+
+/// Represents a response to FetchRequest, `certified_nodes` are indexed by [round][validator_index]
+/// It should fill in gaps from the `exists_bitmask` according to the parents from the `target_digest` node.
+#[derive(Serialize, Deserialize, Clone)]
+struct FetchResponse {
+ epoch: u64,
+ certifies_nodes: Vec>,
+}
+
+impl FetchResponse {
+ pub fn verify(
+ self,
+ _request: &FetchRequest,
+ _validator_verifier: &ValidatorVerifier,
+ ) -> anyhow::Result {
+ todo!("verification");
+ }
+}
+
+impl DAGMessage for FetchRequest {
+ fn epoch(&self) -> u64 {
+ self.target.epoch()
+ }
+}
+
+impl DAGMessage for FetchResponse {
+ fn epoch(&self) -> u64 {
+ self.epoch
+ }
+}
+
+enum FetchCallback {
+ Node(Node, oneshot::Sender),
+ CertifiedNode(CertifiedNode, oneshot::Sender),
+}
+
+impl FetchCallback {
+ pub fn responders(&self, validators: &[Author]) -> Vec {
+ match self {
+ FetchCallback::Node(node, _) => vec![*node.author()],
+ FetchCallback::CertifiedNode(node, _) => node.certificate().signers(validators),
+ }
+ }
+
+ pub fn notify(self) {
+ if match self {
+ FetchCallback::Node(node, sender) => sender.send(node).map_err(|_| ()),
+ FetchCallback::CertifiedNode(node, sender) => sender.send(node).map_err(|_| ()),
+ }
+ .is_err()
+ {
+ error!("Failed to send node back");
+ }
+ }
+}
+
+struct DagFetcher {
+ epoch_state: Arc,
+ network: Arc,
+ dag: Arc>,
+ request_rx: Receiver<(FetchRequest, FetchCallback)>,
+}
+
+impl DagFetcher {
+ pub fn new(
+ epoch_state: Arc,
+ network: Arc,
+ dag: Arc>,
+ ) -> (Self, Sender<(FetchRequest, FetchCallback)>) {
+ let (request_tx, request_rx) = tokio::sync::mpsc::channel(16);
+ (
+ Self {
+ epoch_state,
+ network,
+ dag,
+ request_rx,
+ },
+ request_tx,
+ )
+ }
+
+ pub async fn start(mut self) {
+ while let Some((request, callback)) = self.request_rx.recv().await {
+ let responders =
+ callback.responders(&self.epoch_state.verifier.get_ordered_account_addresses());
+ let network_request = request.clone().into_network_message();
+ if let Ok(response) = self
+ .network
+ .send_rpc_with_fallbacks(responders, network_request, Duration::from_secs(1))
+ .await
+ .and_then(FetchResponse::from_network_message)
+ .and_then(|response| response.verify(&request, &self.epoch_state.verifier))
+ {
+ // TODO: support chunk response or fallback to state sync
+ let mut dag_writer = self.dag.write();
+ for rounds in response.certifies_nodes {
+ for node in rounds {
+ if let Err(e) = dag_writer.add_node(node) {
+ error!("Failed to add node {}", e);
+ }
+ }
+ }
+ callback.notify();
+ }
+ }
+ }
+}
diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs
index eced7bc2c966c..39ece5a63c42e 100644
--- a/consensus/src/dag/mod.rs
+++ b/consensus/src/dag/mod.rs
@@ -3,6 +3,7 @@
#![allow(dead_code)]
mod dag_driver;
+mod dag_fetcher;
mod dag_store;
mod reliable_broadcast;
#[cfg(test)]
diff --git a/consensus/src/dag/reliable_broadcast.rs b/consensus/src/dag/reliable_broadcast.rs
index 2d3367ac81beb..7f4c179b3d3ff 100644
--- a/consensus/src/dag/reliable_broadcast.rs
+++ b/consensus/src/dag/reliable_broadcast.rs
@@ -1,9 +1,8 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0
-use crate::{dag::types::DAGMessage, network_interface::ConsensusMsg};
+use crate::dag::types::{DAGMessage, DAGNetworkSender};
use aptos_consensus_types::common::Author;
-use async_trait::async_trait;
use futures::{stream::FuturesUnordered, StreamExt};
use std::{future::Future, sync::Arc, time::Duration};
@@ -15,16 +14,6 @@ pub trait BroadcastStatus {
fn add(&mut self, peer: Author, ack: Self::Ack) -> anyhow::Result