Skip to content

Commit

Permalink
[dag] network sender implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 11, 2023
1 parent 2305786 commit 5ff85b5
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 3 deletions.
4 changes: 2 additions & 2 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ mod storage;
mod tests;
mod types;

pub use dag_network::RpcHandler;
pub use types::{CertifiedNode, DAGNetworkMessage, Node, NodeId, Vote};
pub use dag_network::{RpcHandler, DAGNetworkSender, RpcWithFallback};
pub use types::{CertifiedNode, DAGNetworkMessage, Node, NodeId, Vote, DAGMessage};
78 changes: 77 additions & 1 deletion consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::{
block_storage::tracing::{observe_block, BlockStage},
counters,
dag::DAGNetworkMessage,
dag::{DAGMessage, DAGNetworkMessage, DAGNetworkSender, RpcWithFallback},
logging::LogEvent,
monitor,
network_interface::{ConsensusMsg, ConsensusNetworkClient},
Expand All @@ -29,10 +29,12 @@ use aptos_network::{
protocols::{network::Event, rpc::error::RpcError},
ProtocolId,
};
use aptos_reliable_broadcast::{RBMessage, RBNetworkSender};
use aptos_types::{
account_address::AccountAddress, epoch_change::EpochChangeProof,
ledger_info::LedgerInfoWithSignatures, validator_verifier::ValidatorVerifier,
};
use async_trait::async_trait;
use bytes::Bytes;
use fail::fail_point;
use futures::{
Expand All @@ -43,6 +45,7 @@ use futures::{
use serde::{de::DeserializeOwned, Serialize};
use std::{
mem::{discriminant, Discriminant},
sync::Arc,
time::Duration,
};

Expand Down Expand Up @@ -404,6 +407,79 @@ impl QuorumStoreSender for NetworkSender {
}
}

// TODO: this can be improved
#[derive(Clone)]
pub struct DAGNetworkSenderImpl {
sender: Arc<NetworkSender>,
time_service: aptos_time_service::TimeService,
}

impl DAGNetworkSenderImpl {
pub fn new(sender: Arc<NetworkSender>) -> Self {
Self {
sender,
time_service: aptos_time_service::TimeService::real(),
}
}
}

#[async_trait]
impl DAGNetworkSender for DAGNetworkSenderImpl {
async fn send_rpc(
&self,
receiver: Author,
message: DAGMessage,
timeout: Duration,
) -> anyhow::Result<DAGMessage> {
self.sender
.consensus_network_client
.send_rpc(receiver, message.into_network_message(), timeout)
.await
.map_err(|e| anyhow!("invalid rpc response: {}", e))
.and_then(|msg| TConsensusMsg::from_network_message(msg))
}

/// Given a list of potential responders, sending rpc to get response from any of them and could
/// fallback to more in case of failures.
async fn send_rpc_with_fallbacks(
&self,
responders: Vec<Author>,
message: DAGMessage,
retry_interval: Duration,
rpc_timeout: Duration,
) -> RpcWithFallback {
let sender = Arc::new(self.clone());
RpcWithFallback::new(
responders,
message,
retry_interval,
rpc_timeout,
sender,
self.time_service.clone(),
)
}
}

#[async_trait]
impl<M> RBNetworkSender<M> for DAGNetworkSenderImpl
where
M: RBMessage + TConsensusMsg + 'static,
{
async fn send_rb_rpc(
&self,
receiver: Author,
message: M,
timeout: Duration,
) -> anyhow::Result<M> {
self.sender
.consensus_network_client
.send_rpc(receiver, message.into_network_message(), timeout)
.await
.map_err(|e| anyhow!("invalid rpc response: {}", e))
.and_then(|msg| TConsensusMsg::from_network_message(msg))
}
}

pub struct NetworkTask {
consensus_messages_tx: aptos_channel::Sender<
(AccountAddress, Discriminant<ConsensusMsg>),
Expand Down

0 comments on commit 5ff85b5

Please sign in to comment.