Skip to content

Commit

Permalink
[dag] network sender implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 21, 2023
1 parent 48de391 commit d917ae0
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 16 deletions.
4 changes: 2 additions & 2 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::{
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, FetchRequestHandler},
dag_handler::NetworkHandler,
dag_network::DAGNetworkSender,
dag_network::TDAGNetworkSender,
dag_store::Dag,
order_rule::OrderRule,
rb_handler::NodeBroadcastHandler,
Expand All @@ -32,7 +32,7 @@ pub fn bootstrap_dag(
latest_ledger_info: LedgerInfo,
storage: Arc<dyn DAGStorage>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
dag_network_sender: Arc<dyn DAGNetworkSender>,
dag_network_sender: Arc<dyn TDAGNetworkSender>,
time_service: aptos_time_service::TimeService,
payload_client: Arc<dyn PayloadClient>,
) -> (
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use super::{dag_network::RpcWithFallback, types::NodeMetadata, RpcHandler};
use crate::dag::{
dag_network::DAGNetworkSender,
dag_network::TDAGNetworkSender,
dag_store::Dag,
types::{CertifiedNode, FetchResponse, Node, RemoteFetchRequest},
};
Expand Down Expand Up @@ -121,7 +121,7 @@ impl LocalFetchRequest {

pub struct DagFetcher {
epoch_state: Arc<EpochState>,
network: Arc<dyn DAGNetworkSender>,
network: Arc<dyn TDAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
request_rx: Receiver<LocalFetchRequest>,
time_service: TimeService,
Expand All @@ -130,7 +130,7 @@ pub struct DagFetcher {
impl DagFetcher {
pub fn new(
epoch_state: Arc<EpochState>,
network: Arc<dyn DAGNetworkSender>,
network: Arc<dyn TDAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
time_service: TimeService,
) -> (
Expand Down
8 changes: 4 additions & 4 deletions consensus/src/dag/dag_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub trait RpcHandler {
}

#[async_trait]
pub trait DAGNetworkSender: Send + Sync + RBNetworkSender<DAGMessage> {
pub trait TDAGNetworkSender: Send + Sync + RBNetworkSender<DAGMessage> {
async fn send_rpc(
&self,
receiver: Author,
Expand Down Expand Up @@ -80,7 +80,7 @@ pub struct RpcWithFallback {
futures: Pin<
Box<FuturesUnordered<Pin<Box<dyn Future<Output = anyhow::Result<DAGMessage>> + Send>>>>,
>,
sender: Arc<dyn DAGNetworkSender>,
sender: Arc<dyn TDAGNetworkSender>,
interval: Pin<Box<Interval>>,
}

Expand All @@ -90,7 +90,7 @@ impl RpcWithFallback {
message: DAGMessage,
retry_interval: Duration,
rpc_timeout: Duration,
sender: Arc<dyn DAGNetworkSender>,
sender: Arc<dyn TDAGNetworkSender>,
time_service: TimeService,
) -> Self {
Self {
Expand All @@ -107,7 +107,7 @@ impl RpcWithFallback {
}

async fn send_rpc(
sender: Arc<dyn DAGNetworkSender>,
sender: Arc<dyn TDAGNetworkSender>,
peer: Author,
message: DAGMessage,
timeout: Duration,
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ mod storage;
mod tests;
mod types;

pub use dag_network::RpcHandler;
pub use types::{CertifiedNode, DAGNetworkMessage, Node, NodeId, Vote};
pub use dag_network::{RpcHandler, TDAGNetworkSender, RpcWithFallback};
pub use types::{CertifiedNode, DAGNetworkMessage, Node, NodeId, Vote, DAGMessage};
4 changes: 2 additions & 2 deletions consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
anchor_election::RoundRobinAnchorElection,
dag_driver::{DagDriver, DagDriverError},
dag_fetcher::DagFetcher,
dag_network::{DAGNetworkSender, RpcWithFallback},
dag_network::{TDAGNetworkSender, RpcWithFallback},
dag_store::Dag,
order_rule::OrderRule,
tests::{dag_test::MockStorage, helpers::new_certified_node},
Expand Down Expand Up @@ -41,7 +41,7 @@ impl RBNetworkSender<DAGMessage> for MockNetworkSender {
}

#[async_trait]
impl DAGNetworkSender for MockNetworkSender {
impl TDAGNetworkSender for MockNetworkSender {
async fn send_rpc(
&self,
_receiver: Author,
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/dag/tests/dag_network_test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation

use crate::dag::{
dag_network::{DAGNetworkSender, RpcWithFallback},
dag_network::{TDAGNetworkSender, RpcWithFallback},
types::{DAGMessage, TestAck, TestMessage},
};
use anyhow::{anyhow, bail};
Expand Down Expand Up @@ -41,7 +41,7 @@ impl RBNetworkSender<DAGMessage> for MockDAGNetworkSender {
}

#[async_trait]
impl DAGNetworkSender for MockDAGNetworkSender {
impl TDAGNetworkSender for MockDAGNetworkSender {
async fn send_rpc(
&self,
receiver: Author,
Expand Down
78 changes: 77 additions & 1 deletion consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::{
block_storage::tracing::{observe_block, BlockStage},
counters,
dag::DAGNetworkMessage,
dag::{DAGMessage, DAGNetworkMessage, TDAGNetworkSender, RpcWithFallback},
logging::LogEvent,
monitor,
network_interface::{ConsensusMsg, ConsensusNetworkClient},
Expand All @@ -29,10 +29,12 @@ use aptos_network::{
protocols::{network::Event, rpc::error::RpcError},
ProtocolId,
};
use aptos_reliable_broadcast::{RBMessage, RBNetworkSender};
use aptos_types::{
account_address::AccountAddress, epoch_change::EpochChangeProof,
ledger_info::LedgerInfoWithSignatures, validator_verifier::ValidatorVerifier,
};
use async_trait::async_trait;
use bytes::Bytes;
use fail::fail_point;
use futures::{
Expand All @@ -43,6 +45,7 @@ use futures::{
use serde::{de::DeserializeOwned, Serialize};
use std::{
mem::{discriminant, Discriminant},
sync::Arc,
time::Duration,
};

Expand Down Expand Up @@ -404,6 +407,79 @@ impl QuorumStoreSender for NetworkSender {
}
}

// TODO: this can be improved
#[derive(Clone)]
pub struct DAGNetworkSenderImpl {
sender: Arc<NetworkSender>,
time_service: aptos_time_service::TimeService,
}

impl DAGNetworkSenderImpl {
pub fn new(sender: Arc<NetworkSender>) -> Self {
Self {
sender,
time_service: aptos_time_service::TimeService::real(),
}
}
}

#[async_trait]
impl TDAGNetworkSender for DAGNetworkSenderImpl {
async fn send_rpc(
&self,
receiver: Author,
message: DAGMessage,
timeout: Duration,
) -> anyhow::Result<DAGMessage> {
self.sender
.consensus_network_client
.send_rpc(receiver, message.into_network_message(), timeout)
.await
.map_err(|e| anyhow!("invalid rpc response: {}", e))
.and_then(|msg| TConsensusMsg::from_network_message(msg))
}

/// Given a list of potential responders, sending rpc to get response from any of them and could
/// fallback to more in case of failures.
async fn send_rpc_with_fallbacks(
&self,
responders: Vec<Author>,
message: DAGMessage,
retry_interval: Duration,
rpc_timeout: Duration,
) -> RpcWithFallback {
let sender = Arc::new(self.clone());
RpcWithFallback::new(
responders,
message,
retry_interval,
rpc_timeout,
sender,
self.time_service.clone(),
)
}
}

#[async_trait]
impl<M> RBNetworkSender<M> for DAGNetworkSenderImpl
where
M: RBMessage + TConsensusMsg + 'static,
{
async fn send_rb_rpc(
&self,
receiver: Author,
message: M,
timeout: Duration,
) -> anyhow::Result<M> {
self.sender
.consensus_network_client
.send_rpc(receiver, message.into_network_message(), timeout)
.await
.map_err(|e| anyhow!("invalid rpc response: {}", e))
.and_then(|msg| TConsensusMsg::from_network_message(msg))
}
}

pub struct NetworkTask {
consensus_messages_tx: aptos_channel::Sender<
(AccountAddress, Discriminant<ConsensusMsg>),
Expand Down

0 comments on commit d917ae0

Please sign in to comment.