Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dag] Integrate dag fetcher with RB node handler #9585

Merged
merged 1 commit into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ pub fn bootstrap_dag(
time_service.clone(),
));

let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone())));
let dag = Arc::new(RwLock::new(Dag::new(
epoch_state.clone(),
storage.clone(),
current_round,
)));

let anchor_election = Box::new(RoundRobinAnchorElection::new(validators));
let order_rule = OrderRule::new(
Expand Down Expand Up @@ -86,10 +90,15 @@ pub fn bootstrap_dag(
time_service,
storage.clone(),
order_rule,
fetch_requester.clone(),
);
let rb_handler = NodeBroadcastHandler::new(
dag.clone(),
signer,
epoch_state.clone(),
storage.clone(),
fetch_requester,
);
let rb_handler =
NodeBroadcastHandler::new(dag.clone(), signer, epoch_state.clone(), storage.clone());
let fetch_handler = FetchRequestHandler::new(dag, epoch_state.clone());

let dag_handler = NetworkHandler::new(
Expand Down
1 change: 1 addition & 0 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use super::{
};
use crate::{
dag::{
dag_fetcher::TFetchRequester,
dag_store::Dag,
types::{CertificateAckState, CertifiedNode, Node, NodeCertificate, SignatureBuilder},
},
Expand Down
11 changes: 8 additions & 3 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,19 @@ impl<T> Stream for FetchWaiter<T> {
}
}

pub trait TFetchRequester: Send + Sync {
fn request_for_node(&self, node: Node) -> anyhow::Result<()>;
fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()>;
}

pub struct FetchRequester {
request_tx: Sender<LocalFetchRequest>,
node_waiter_tx: Sender<oneshot::Receiver<Node>>,
certified_node_waiter_tx: Sender<oneshot::Receiver<CertifiedNode>>,
}

impl FetchRequester {
pub fn request_for_node(&self, node: Node) -> anyhow::Result<()> {
impl TFetchRequester for FetchRequester {
fn request_for_node(&self, node: Node) -> anyhow::Result<()> {
let (res_tx, res_rx) = oneshot::channel();
let fetch_req = LocalFetchRequest::Node(node, res_tx);
self.request_tx
Expand All @@ -70,7 +75,7 @@ impl FetchRequester {
Ok(())
}

pub fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()> {
fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()> {
let (res_tx, res_rx) = oneshot::channel();
let fetch_req = LocalFetchRequest::CertifiedNode(node, res_tx);
self.request_tx.try_send(fetch_req).map_err(|e| {
Expand Down
12 changes: 9 additions & 3 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ pub struct Dag {
/// Map between peer id to vector index
author_to_index: HashMap<Author, usize>,
storage: Arc<dyn DAGStorage>,
initial_round: Round,
}

impl Dag {
pub fn new(epoch_state: Arc<EpochState>, storage: Arc<dyn DAGStorage>) -> Self {
pub fn new(
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
initial_round: Round,
) -> Self {
let epoch = epoch_state.epoch;
let author_to_index = epoch_state.verifier.address_to_validator_index().clone();
let num_validators = author_to_index.len();
Expand Down Expand Up @@ -77,6 +82,7 @@ impl Dag {
nodes_by_round,
author_to_index,
storage,
initial_round,
}
}

Expand All @@ -85,15 +91,15 @@ impl Dag {
.nodes_by_round
.first_key_value()
.map(|(round, _)| round)
.unwrap_or(&0)
.unwrap_or(&self.initial_round)
}

pub fn highest_round(&self) -> Round {
*self
.nodes_by_round
.last_key_value()
.map(|(round, _)| round)
.unwrap_or(&0)
.unwrap_or(&self.initial_round)
}

pub fn add_node(&mut self, node: CertifiedNode) -> anyhow::Result<()> {
Expand Down
46 changes: 27 additions & 19 deletions consensus/src/dag/rb_handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::{storage::DAGStorage, NodeId};
use super::{dag_fetcher::TFetchRequester, storage::DAGStorage, NodeId};
use crate::dag::{
dag_network::RpcHandler,
dag_store::Dag,
Expand All @@ -21,8 +21,8 @@ pub enum NodeBroadcastHandleError {
InvalidParent,
#[error("missing parents")]
MissingParents,
#[error("parents do not meet quorum voting power")]
NotEnoughParents,
#[error("stale round number")]
StaleRound(Round),
}

pub(crate) struct NodeBroadcastHandler {
Expand All @@ -31,6 +31,7 @@ pub(crate) struct NodeBroadcastHandler {
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
fetch_requester: Arc<dyn TFetchRequester>,
}

impl NodeBroadcastHandler {
Expand All @@ -39,6 +40,7 @@ impl NodeBroadcastHandler {
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
fetch_requester: Arc<dyn TFetchRequester>,
) -> Self {
let epoch = epoch_state.epoch;
let votes_by_round_peer = read_votes_from_storage(&storage, epoch);
Expand All @@ -49,6 +51,7 @@ impl NodeBroadcastHandler {
signer,
epoch_state,
storage,
fetch_requester,
}
}

Expand All @@ -67,21 +70,15 @@ impl NodeBroadcastHandler {
self.storage.delete_votes(to_delete)
}

fn validate(&self, node: &Node) -> anyhow::Result<()> {
fn validate(&self, node: Node) -> anyhow::Result<Node> {
let current_round = node.metadata().round();

// round 0 is a special case and does not require any parents
if current_round == 0 {
return Ok(());
}

let prev_round = current_round - 1;

let dag_reader = self.dag.read();
// check if the parent round is missing in the DAG
let lowest_round = dag_reader.lowest_round();

ensure!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we still need to filter out the lower round right? otherwise we'd fetch all those nodes from gc'ed rounds

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. I can send you a node from a very old round and force you to fetch if it is below the GC level. We should ignore nodes from rounds below the GC level.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I needed the DAG store to support this minimum round to fix this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's becoming hard to write unit tests without a verifier trait. I will follow-up to introduce that trait and then we can have more unit tests for this case. Added TODO for now.

prev_round >= dag_reader.lowest_round(),
NodeBroadcastHandleError::MissingParents
current_round >= lowest_round,
NodeBroadcastHandleError::StaleRound(current_round)
);

// check which parents are missing in the DAG
Expand All @@ -91,19 +88,30 @@ impl NodeBroadcastHandler {
.filter(|parent| !dag_reader.exists(parent.metadata()))
.cloned()
.collect();
drop(dag_reader); // Drop the DAG store early as it is no longer required

if !missing_parents.is_empty() {
// For each missing parent, verify their signatures and voting power
// For each missing parent, verify their signatures and voting power.
// Otherwise, a malicious node can send bad nodes with fake parents
// and cause this peer to issue unnecessary fetch requests.
ensure!(
missing_parents
.iter()
.all(|parent| { parent.verify(&self.epoch_state.verifier).is_ok() }),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in node.verify()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to only verify the missing parents. Ideally, we won't miss any parents in the normal case and don't have to verify at all.

NodeBroadcastHandleError::InvalidParent
);
// TODO: notify dag fetcher to fetch missing node and drop this node
bail!(NodeBroadcastHandleError::MissingParents);

// Don't issue fetch requests for parents of the lowest round in the DAG
// because they are already GC'ed
if current_round > lowest_round {
if let Err(err) = self.fetch_requester.request_for_node(node) {
error!("request to fetch failed: {}", err);
}
bail!(NodeBroadcastHandleError::MissingParents);
}
}

Ok(())
Ok(node)
}
}

Expand Down Expand Up @@ -137,7 +145,7 @@ impl RpcHandler for NodeBroadcastHandler {
type Response = Vote;

fn process(&mut self, node: Self::Request) -> anyhow::Result<Self::Response> {
self.validate(&node)?;
let node = self.validate(node)?;

let votes_by_peer = self
.votes_by_round_peer
Expand Down
17 changes: 10 additions & 7 deletions consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ fn test_certified_node_handler() {
verifier: validator_verifier,
});
let storage = Arc::new(MockStorage::new());
let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage.clone())));

let zeroth_round_node = new_certified_node(0, signers[0].author(), vec![]);
let dag = Arc::new(RwLock::new(Dag::new(
epoch_state.clone(),
storage.clone(),
1,
)));

let network_sender = Arc::new(MockNetworkSender {});
let rb = Arc::new(ReliableBroadcast::new(
Expand Down Expand Up @@ -115,13 +117,14 @@ fn test_certified_node_handler() {
fetch_requester,
);

let first_round_node = new_certified_node(1, signers[0].author(), vec![]);
// expect an ack for a valid message
assert_ok!(driver.process(zeroth_round_node.clone()));
assert_ok!(driver.process(first_round_node.clone()));
// expect an ack if the same message is sent again
assert_ok_eq!(driver.process(zeroth_round_node), CertifiedAck::new(1));
assert_ok_eq!(driver.process(first_round_node), CertifiedAck::new(1));

let parent_node = new_certified_node(0, signers[1].author(), vec![]);
let invalid_node = new_certified_node(1, signers[0].author(), vec![parent_node.certificate()]);
let parent_node = new_certified_node(1, signers[1].author(), vec![]);
let invalid_node = new_certified_node(2, signers[0].author(), vec![parent_node.certificate()]);
assert_eq!(
driver.process(invalid_node).unwrap_err().to_string(),
DagDriverError::MissingParents.to_string()
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/dag/tests/dag_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ fn setup() -> (Vec<ValidatorSigner>, Arc<EpochState>, Dag, Arc<MockStorage>) {
verifier: validator_verifier,
});
let storage = Arc::new(MockStorage::new());
let dag = Dag::new(epoch_state.clone(), storage.clone());
let dag = Dag::new(epoch_state.clone(), storage.clone(), 1);
(signers, epoch_state, dag, storage)
}

Expand Down Expand Up @@ -190,7 +190,7 @@ fn test_dag_recover_from_storage() {
assert!(dag.add_node(node).is_ok());
}
}
let new_dag = Dag::new(epoch_state.clone(), storage.clone());
let new_dag = Dag::new(epoch_state.clone(), storage.clone(), 1);

for metadata in &metadatas {
assert!(new_dag.exists(metadata));
Expand All @@ -201,7 +201,7 @@ fn test_dag_recover_from_storage() {
verifier: epoch_state.verifier.clone(),
});

let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone());
let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone(), 1);
assert!(storage.certified_node_data.lock().is_empty());
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/fetcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn test_dag_fetcher_receiver() {
verifier: validator_verifier,
});
let storage = Arc::new(MockStorage::new());
let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage)));
let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage, 1)));

let mut fetcher = FetchRequestHandler::new(dag.clone(), epoch_state);

Expand Down
7 changes: 4 additions & 3 deletions consensus/src/dag/tests/order_rule_tests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::helpers::new_certified_node;
use crate::{
dag::{
anchor_election::RoundRobinAnchorElection,
dag_store::Dag,
order_rule::OrderRule,
tests::{dag_test::MockStorage, helpers::new_certified_node},
tests::dag_test::MockStorage,
types::{NodeCertificate, NodeMetadata},
CertifiedNode,
},
Expand Down Expand Up @@ -153,7 +154,7 @@ proptest! {
epoch: 1,
verifier: validator_verifier,
});
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()));
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1);
for round_nodes in &nodes {
for node in round_nodes.iter().flatten() {
dag.add_node(node.clone()).unwrap();
Expand Down Expand Up @@ -231,7 +232,7 @@ fn test_order_rule_basic() {
epoch: 1,
verifier: validator_verifier,
});
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()));
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1);
for round_nodes in &nodes {
for node in round_nodes.iter().flatten() {
dag.add_node(node.clone()).unwrap();
Expand Down
Loading