Skip to content

Commit

Permalink
[dag] Integrate Order Rule with Dag Driver
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 9, 2023
1 parent c40e5e7 commit 26515a8
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 10 deletions.
12 changes: 10 additions & 2 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::{
order_rule::OrderRule,
storage::DAGStorage,
types::{CertifiedAck, DAGMessage},
RpcHandler,
Expand Down Expand Up @@ -43,6 +44,7 @@ pub(crate) struct DagDriver {
time_service: Arc<dyn TimeService>,
rb_abort_handle: Option<AbortHandle>,
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
}

impl DagDriver {
Expand All @@ -55,6 +57,7 @@ impl DagDriver {
current_round: Round,
time_service: Arc<dyn TimeService>,
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
) -> Self {
// TODO: rebroadcast nodes after recovery
Self {
Expand All @@ -67,6 +70,7 @@ impl DagDriver {
time_service,
rb_abort_handle: None,
storage,
order_rule,
}
}

Expand Down Expand Up @@ -149,11 +153,15 @@ impl RpcHandler for DagDriver {
{
let dag_reader = self.dag.read();
if dag_reader.exists(node.metadata()) {
return Ok(CertifiedAck::new(node.metadata().epoch()));
return Ok(CertifiedAck::new(epoch));
}
}

self.add_node(node)?;
let node_metadata = node.metadata().clone();
self.add_node(node).and_then(|_| {
self.order_rule.process_new_node(&node_metadata);
Ok(())
})?;

Ok(CertifiedAck::new(epoch))
}
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Copyright © Aptos Foundation

use super::{
dag_driver::DagDriver, dag_fetcher::FetchRequestHandler, dag_network::DAGNetworkSender,
storage::DAGStorage, types::TDAGMessage,
order_rule::OrderRule,
};
use crate::{
dag::{
Expand Down Expand Up @@ -46,6 +45,7 @@ impl NetworkHandler {
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
time_service: Arc<dyn TimeService>,
aptos_time_service: aptos_time_service::TimeService,
order_rule: OrderRule,
) -> Self {
let rb = Arc::new(ReliableBroadcast::new(
epoch_state.verifier.get_ordered_account_addresses().clone(),
Expand All @@ -70,6 +70,7 @@ impl NetworkHandler {
1,
time_service,
storage,
order_rule,
),
epoch_state: epoch_state.clone(),
fetch_receiver: FetchRequestHandler::new(dag, epoch_state),
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/dag/order_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ impl OrderRule {
(r1 ^ r2) & 1 == 0
}

pub fn process_new_node(&mut self, node: &CertifiedNode) {
let round = node.round();
pub fn process_new_node(&mut self, node_metadata: &NodeMetadata) {
let round = node_metadata.round();
// If the node comes from the proposal round in the current instance, it can't trigger any ordering
if round <= self.lowest_unordered_anchor_round
|| Self::check_parity(round, self.lowest_unordered_anchor_round)
Expand Down
8 changes: 6 additions & 2 deletions consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use crate::{
dag_store::Dag,
tests::{dag_test::MockStorage, helpers::new_certified_node},
types::{CertifiedAck, DAGMessage},
RpcHandler,
RpcHandler, order_rule::OrderRule,
},
test_utils::MockPayloadManager,
util::mock_time_service::SimulatedTimeService,
};
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{epoch_state::EpochState, validator_verifier::random_validator_verifier};
use aptos_types::{epoch_state::EpochState, validator_verifier::random_validator_verifier, ledger_info::LedgerInfo};
use async_trait::async_trait;
use claims::{assert_ok, assert_ok_eq};
use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -78,6 +78,9 @@ fn test_certified_node_handler() {
aptos_time_service::TimeService::mock(),
));
let time_service = Arc::new(SimulatedTimeService::new());
let (ordered_nodes_sender, _) = futures_channel::mpsc::unbounded();
let validators = signers.iter().map(|vs| vs.author()).collect();
let order_rule = OrderRule::new(epoch_state.clone(), LedgerInfo::mock_genesis(None), dag.clone(), Box::new(RoundRobinAnchorElection::new(validators)), ordered_nodes_sender);
let mut driver = DagDriver::new(
signers[0].author(),
epoch_state,
Expand All @@ -87,6 +90,7 @@ fn test_certified_node_handler() {
1,
time_service,
storage,
order_rule,
);

// expect an ack for a valid message
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/dag/tests/order_rule_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ proptest! {
let dag = Arc::new(RwLock::new(dag.clone()));
let (mut order_rule, mut receiver) = create_order_rule(epoch_state.clone(), dag);
for idx in seq {
order_rule.process_new_node(&flatten_nodes[idx]);
order_rule.process_new_node(flatten_nodes[idx].metadata());
}
let mut ordered = vec![];
while let Ok(Some(mut ordered_nodes)) = receiver.try_next() {
Expand Down Expand Up @@ -241,7 +241,7 @@ fn test_order_rule_basic() {
let dag = Arc::new(RwLock::new(dag.clone()));
let (mut order_rule, mut receiver) = create_order_rule(epoch_state, dag);
for node in nodes.iter().flatten().flatten() {
order_rule.process_new_node(node);
order_rule.process_new_node(node.metadata());
}
let expected_order = vec![
// anchor (1, 0) has 1 votes, anchor (3, 1) has 2 votes and a path to (1, 0)
Expand Down

0 comments on commit 26515a8

Please sign in to comment.