From d917ae05d46250a77702ab464b798913d4f81d74 Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Wed, 2 Aug 2023 19:29:50 -0700 Subject: [PATCH] [dag] network sender implementation --- consensus/src/dag/bootstrap.rs | 4 +- consensus/src/dag/dag_fetcher.rs | 6 +- consensus/src/dag/dag_network.rs | 8 +-- consensus/src/dag/mod.rs | 4 +- consensus/src/dag/tests/dag_driver_tests.rs | 4 +- consensus/src/dag/tests/dag_network_test.rs | 4 +- consensus/src/network.rs | 78 ++++++++++++++++++++- 7 files changed, 92 insertions(+), 16 deletions(-) diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 6f96d036c51f58..a07c5f73f1e008 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -5,7 +5,7 @@ use super::{ dag_driver::DagDriver, dag_fetcher::{DagFetcher, FetchRequestHandler}, dag_handler::NetworkHandler, - dag_network::DAGNetworkSender, + dag_network::TDAGNetworkSender, dag_store::Dag, order_rule::OrderRule, rb_handler::NodeBroadcastHandler, @@ -32,7 +32,7 @@ pub fn bootstrap_dag( latest_ledger_info: LedgerInfo, storage: Arc, rb_network_sender: Arc>, - dag_network_sender: Arc, + dag_network_sender: Arc, time_service: aptos_time_service::TimeService, payload_client: Arc, ) -> ( diff --git a/consensus/src/dag/dag_fetcher.rs b/consensus/src/dag/dag_fetcher.rs index c93412c1db59ed..7ebce618df1410 100644 --- a/consensus/src/dag/dag_fetcher.rs +++ b/consensus/src/dag/dag_fetcher.rs @@ -3,7 +3,7 @@ use super::{dag_network::RpcWithFallback, types::NodeMetadata, RpcHandler}; use crate::dag::{ - dag_network::DAGNetworkSender, + dag_network::TDAGNetworkSender, dag_store::Dag, types::{CertifiedNode, FetchResponse, Node, RemoteFetchRequest}, }; @@ -121,7 +121,7 @@ impl LocalFetchRequest { pub struct DagFetcher { epoch_state: Arc, - network: Arc, + network: Arc, dag: Arc>, request_rx: Receiver, time_service: TimeService, @@ -130,7 +130,7 @@ pub struct DagFetcher { impl DagFetcher { pub fn new( epoch_state: Arc, - network: Arc, + network: Arc, dag: Arc>, time_service: TimeService, ) -> ( diff --git a/consensus/src/dag/dag_network.rs b/consensus/src/dag/dag_network.rs index fa61f0b40bb008..b56511d961d730 100644 --- a/consensus/src/dag/dag_network.rs +++ b/consensus/src/dag/dag_network.rs @@ -25,7 +25,7 @@ pub trait RpcHandler { } #[async_trait] -pub trait DAGNetworkSender: Send + Sync + RBNetworkSender { +pub trait TDAGNetworkSender: Send + Sync + RBNetworkSender { async fn send_rpc( &self, receiver: Author, @@ -80,7 +80,7 @@ pub struct RpcWithFallback { futures: Pin< Box> + Send>>>>, >, - sender: Arc, + sender: Arc, interval: Pin>, } @@ -90,7 +90,7 @@ impl RpcWithFallback { message: DAGMessage, retry_interval: Duration, rpc_timeout: Duration, - sender: Arc, + sender: Arc, time_service: TimeService, ) -> Self { Self { @@ -107,7 +107,7 @@ impl RpcWithFallback { } async fn send_rpc( - sender: Arc, + sender: Arc, peer: Author, message: DAGMessage, timeout: Duration, diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs index 7b96f4384e3153..602306be427cd3 100644 --- a/consensus/src/dag/mod.rs +++ b/consensus/src/dag/mod.rs @@ -16,5 +16,5 @@ mod storage; mod tests; mod types; -pub use dag_network::RpcHandler; -pub use types::{CertifiedNode, DAGNetworkMessage, Node, NodeId, Vote}; +pub use dag_network::{RpcHandler, TDAGNetworkSender, RpcWithFallback}; +pub use types::{CertifiedNode, DAGNetworkMessage, Node, NodeId, Vote, DAGMessage}; diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index 018710402f9434..f7e029310939a9 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -5,7 +5,7 @@ use crate::{ anchor_election::RoundRobinAnchorElection, dag_driver::{DagDriver, DagDriverError}, dag_fetcher::DagFetcher, - dag_network::{DAGNetworkSender, RpcWithFallback}, + dag_network::{TDAGNetworkSender, RpcWithFallback}, dag_store::Dag, order_rule::OrderRule, tests::{dag_test::MockStorage, helpers::new_certified_node}, @@ -41,7 +41,7 @@ impl RBNetworkSender for MockNetworkSender { } #[async_trait] -impl DAGNetworkSender for MockNetworkSender { +impl TDAGNetworkSender for MockNetworkSender { async fn send_rpc( &self, _receiver: Author, diff --git a/consensus/src/dag/tests/dag_network_test.rs b/consensus/src/dag/tests/dag_network_test.rs index ab35ce7d3e3449..860c425801f614 100644 --- a/consensus/src/dag/tests/dag_network_test.rs +++ b/consensus/src/dag/tests/dag_network_test.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation use crate::dag::{ - dag_network::{DAGNetworkSender, RpcWithFallback}, + dag_network::{TDAGNetworkSender, RpcWithFallback}, types::{DAGMessage, TestAck, TestMessage}, }; use anyhow::{anyhow, bail}; @@ -41,7 +41,7 @@ impl RBNetworkSender for MockDAGNetworkSender { } #[async_trait] -impl DAGNetworkSender for MockDAGNetworkSender { +impl TDAGNetworkSender for MockDAGNetworkSender { async fn send_rpc( &self, receiver: Author, diff --git a/consensus/src/network.rs b/consensus/src/network.rs index 93f59c1c1111cd..818015973996df 100644 --- a/consensus/src/network.rs +++ b/consensus/src/network.rs @@ -5,7 +5,7 @@ use crate::{ block_storage::tracing::{observe_block, BlockStage}, counters, - dag::DAGNetworkMessage, + dag::{DAGMessage, DAGNetworkMessage, TDAGNetworkSender, RpcWithFallback}, logging::LogEvent, monitor, network_interface::{ConsensusMsg, ConsensusNetworkClient}, @@ -29,10 +29,12 @@ use aptos_network::{ protocols::{network::Event, rpc::error::RpcError}, ProtocolId, }; +use aptos_reliable_broadcast::{RBMessage, RBNetworkSender}; use aptos_types::{ account_address::AccountAddress, epoch_change::EpochChangeProof, ledger_info::LedgerInfoWithSignatures, validator_verifier::ValidatorVerifier, }; +use async_trait::async_trait; use bytes::Bytes; use fail::fail_point; use futures::{ @@ -43,6 +45,7 @@ use futures::{ use serde::{de::DeserializeOwned, Serialize}; use std::{ mem::{discriminant, Discriminant}, + sync::Arc, time::Duration, }; @@ -404,6 +407,79 @@ impl QuorumStoreSender for NetworkSender { } } +// TODO: this can be improved +#[derive(Clone)] +pub struct DAGNetworkSenderImpl { + sender: Arc, + time_service: aptos_time_service::TimeService, +} + +impl DAGNetworkSenderImpl { + pub fn new(sender: Arc) -> Self { + Self { + sender, + time_service: aptos_time_service::TimeService::real(), + } + } +} + +#[async_trait] +impl TDAGNetworkSender for DAGNetworkSenderImpl { + async fn send_rpc( + &self, + receiver: Author, + message: DAGMessage, + timeout: Duration, + ) -> anyhow::Result { + self.sender + .consensus_network_client + .send_rpc(receiver, message.into_network_message(), timeout) + .await + .map_err(|e| anyhow!("invalid rpc response: {}", e)) + .and_then(|msg| TConsensusMsg::from_network_message(msg)) + } + + /// Given a list of potential responders, sending rpc to get response from any of them and could + /// fallback to more in case of failures. + async fn send_rpc_with_fallbacks( + &self, + responders: Vec, + message: DAGMessage, + retry_interval: Duration, + rpc_timeout: Duration, + ) -> RpcWithFallback { + let sender = Arc::new(self.clone()); + RpcWithFallback::new( + responders, + message, + retry_interval, + rpc_timeout, + sender, + self.time_service.clone(), + ) + } +} + +#[async_trait] +impl RBNetworkSender for DAGNetworkSenderImpl +where + M: RBMessage + TConsensusMsg + 'static, +{ + async fn send_rb_rpc( + &self, + receiver: Author, + message: M, + timeout: Duration, + ) -> anyhow::Result { + self.sender + .consensus_network_client + .send_rpc(receiver, message.into_network_message(), timeout) + .await + .map_err(|e| anyhow!("invalid rpc response: {}", e)) + .and_then(|msg| TConsensusMsg::from_network_message(msg)) + } +} + pub struct NetworkTask { consensus_messages_tx: aptos_channel::Sender< (AccountAddress, Discriminant),