Skip to content

Commit

Permalink
[dag] Integrate Dag Fetcher with Dag Driver
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 9, 2023
1 parent 26515a8 commit da2fad5
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 12 deletions.
11 changes: 9 additions & 2 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::{
dag_fetcher::FetchRequester,
order_rule::OrderRule,
storage::DAGStorage,
types::{CertifiedAck, DAGMessage},
Expand All @@ -15,9 +16,10 @@ use crate::{
state_replication::PayloadClient,
util::time_service::TimeService,
};
use anyhow::bail;
use anyhow::{bail, Ok};
use aptos_consensus_types::common::{Author, Payload};
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_reliable_broadcast::ReliableBroadcast;
use aptos_types::{block_info::Round, epoch_state::EpochState};
use futures::{
Expand Down Expand Up @@ -45,6 +47,7 @@ pub(crate) struct DagDriver {
rb_abort_handle: Option<AbortHandle>,
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
fetch_requester: Arc<FetchRequester>,
}

impl DagDriver {
Expand All @@ -58,6 +61,7 @@ impl DagDriver {
time_service: Arc<dyn TimeService>,
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
fetch_requester: Arc<FetchRequester>,
) -> Self {
// TODO: rebroadcast nodes after recovery
Self {
Expand All @@ -71,6 +75,7 @@ impl DagDriver {
rb_abort_handle: None,
storage,
order_rule,
fetch_requester,
}
}

Expand All @@ -88,7 +93,9 @@ impl DagDriver {
let round = node.metadata().round();

if !dag_writer.all_exists(node.parents_metadata()) {
// TODO(ibalajiarun): implement fetching logic.
if let Err(err) = self.fetch_requester.request_for_certified_node(node) {
error!("request to fetch failed: {}", err);
}
bail!(DagDriverError::MissingParents);
}

Expand Down
64 changes: 56 additions & 8 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,58 @@ use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_time_service::TimeService;
use aptos_types::epoch_state::EpochState;
use futures::StreamExt;
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::sync::{oneshot, mpsc::{Sender, Receiver}};
use std::{collections::HashMap, sync::Arc, time::Duration};
use thiserror::Error as ThisError;
use tokio::sync::{
mpsc::{Receiver, Sender},
oneshot,
};

pub struct FetchRequester {
request_tx: Sender<LocalFetchRequest>,
node_rx_futures: FuturesUnordered<oneshot::Receiver<Node>>,
certified_node_rx_futures: FuturesUnordered<oneshot::Receiver<CertifiedNode>>,
}

impl FetchRequester {
pub fn new(request_tx: Sender<LocalFetchRequest>) -> Self {
Self {
request_tx,
node_rx_futures: FuturesUnordered::new(),
certified_node_rx_futures: FuturesUnordered::new(),
}
}

pub fn request_for_node(&self, node: Node) -> anyhow::Result<()> {
let (res_tx, res_rx) = oneshot::channel();
let fetch_req = LocalFetchRequest::Node(node, res_tx);
self.request_tx
.try_send(fetch_req)
.map_err(|e| anyhow::anyhow!("unable to send fetch request to channel: {}", e))?;
self.node_rx_futures.push(res_rx);
Ok(())
}

pub fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()> {
let (res_tx, res_rx) = oneshot::channel();
let fetch_req = LocalFetchRequest::CertifiedNode(node, res_tx);
self.request_tx
.try_send(fetch_req)
.map_err(|e| anyhow::anyhow!("unable to send fetch request to channel: {}", e))?;
self.certified_node_rx_futures.push(res_rx);
Ok(())
}

pub async fn next_ready_node(&mut self) -> Option<Result<Node, oneshot::error::RecvError>> {
self.node_rx_futures.next().await
}

pub async fn next_ready_certified_node(
&mut self,
) -> Option<Result<CertifiedNode, oneshot::error::RecvError>> {
self.certified_node_rx_futures.next().await
}
}

#[derive(Debug)]
pub enum LocalFetchRequest {
Node(Node, oneshot::Sender<Node>),
CertifiedNode(CertifiedNode, oneshot::Sender<CertifiedNode>),
Expand Down Expand Up @@ -55,7 +99,7 @@ impl LocalFetchRequest {
}
}

struct DagFetcher {
pub struct DagFetcher {
epoch_state: Arc<EpochState>,
network: Arc<dyn DAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
Expand All @@ -69,7 +113,7 @@ impl DagFetcher {
network: Arc<dyn DAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
time_service: TimeService,
) -> (Self, Sender<LocalFetchRequest>) {
) -> (Self, FetchRequester) {
let (request_tx, request_rx) = tokio::sync::mpsc::channel(16);
(
Self {
Expand All @@ -79,7 +123,11 @@ impl DagFetcher {
request_rx,
time_service,
},
request_tx,
FetchRequester {
request_tx,
node_rx_futures: FuturesUnordered::new(),
certified_node_rx_futures: FuturesUnordered::new(),
},
)
}

Expand Down
16 changes: 14 additions & 2 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
// Copyright © Aptos Foundation

use super::{
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, FetchRequestHandler},
dag_network::DAGNetworkSender,
order_rule::OrderRule,
storage::DAGStorage,
types::TDAGMessage,
};
use crate::{
dag::{
Expand Down Expand Up @@ -41,7 +46,7 @@ impl NetworkHandler {
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
payload_client: Arc<dyn PayloadClient>,
_dag_network_sender: Arc<dyn DAGNetworkSender>,
dag_network_sender: Arc<dyn DAGNetworkSender>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
time_service: Arc<dyn TimeService>,
aptos_time_service: aptos_time_service::TimeService,
Expand All @@ -51,8 +56,14 @@ impl NetworkHandler {
epoch_state.verifier.get_ordered_account_addresses().clone(),
rb_network_sender,
ExponentialBackoff::from_millis(10),
aptos_time_service,
aptos_time_service.clone(),
));
let (dag_fetcher, fetch_requester) = DagFetcher::new(
epoch_state.clone(),
dag_network_sender,
dag.clone(),
aptos_time_service,
);
Self {
dag_rpc_rx,
node_receiver: NodeBroadcastHandler::new(
Expand All @@ -71,6 +82,7 @@ impl NetworkHandler {
time_service,
storage,
order_rule,
Arc::new(fetch_requester),
),
epoch_state: epoch_state.clone(),
fetch_receiver: FetchRequestHandler::new(dag, epoch_state),
Expand Down
6 changes: 6 additions & 0 deletions consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
tests::{dag_test::MockStorage, helpers::new_certified_node},
types::{CertifiedAck, DAGMessage},
RpcHandler, order_rule::OrderRule,
anchor_election::RoundRobinAnchorElection, dag_fetcher::FetchRequester,
},
test_utils::MockPayloadManager,
util::mock_time_service::SimulatedTimeService,
Expand Down Expand Up @@ -81,6 +82,10 @@ fn test_certified_node_handler() {
let (ordered_nodes_sender, _) = futures_channel::mpsc::unbounded();
let validators = signers.iter().map(|vs| vs.author()).collect();
let order_rule = OrderRule::new(epoch_state.clone(), LedgerInfo::mock_genesis(None), dag.clone(), Box::new(RoundRobinAnchorElection::new(validators)), ordered_nodes_sender);

let (request_tx, _) = tokio::sync::mpsc::channel(10);
let fetch_requester = Arc::new(FetchRequester::new(request_tx));

let mut driver = DagDriver::new(
signers[0].author(),
epoch_state,
Expand All @@ -91,6 +96,7 @@ fn test_certified_node_handler() {
time_service,
storage,
order_rule,
fetch_requester,
);

// expect an ack for a valid message
Expand Down

0 comments on commit da2fad5

Please sign in to comment.