From b8f7e77aea00159324468e85aa6c476f8e0e796f Mon Sep 17 00:00:00 2001 From: Zekun Li Date: Fri, 28 Jun 2024 16:28:16 -0700 Subject: [PATCH 1/2] [consensus] fix edge case of block retrieval --- consensus/src/block_storage/sync_manager.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/consensus/src/block_storage/sync_manager.rs b/consensus/src/block_storage/sync_manager.rs index ddc2120608828..d1e1842b28d52 100644 --- a/consensus/src/block_storage/sync_manager.rs +++ b/consensus/src/block_storage/sync_manager.rs @@ -23,7 +23,7 @@ use crate::{ persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData}, pipeline::execution_client::TExecutionClient, }; -use anyhow::{bail, Context}; +use anyhow::{anyhow, bail, Context}; use aptos_consensus_types::{ block::Block, block_retrieval::{ @@ -47,7 +47,7 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use futures_channel::oneshot; use rand::{prelude::*, Rng}; use std::{clone::Clone, cmp::min, sync::Arc, time::Duration}; -use tokio::time; +use tokio::{time, time::timeout}; #[derive(Debug, PartialEq, Eq)] /// Whether we need to do block retrieval if we want to insert a Quorum Cert. @@ -568,15 +568,14 @@ impl BlockRetriever { let author = self.network.author(); futures.push( async move { - let response = rx - .await - .map(|block| { - BlockRetrievalResponse::new( - BlockRetrievalStatus::SucceededWithTarget, - vec![block], - ) - }) - .map_err(|_| anyhow::anyhow!("self retrieval failed")); + let response = match timeout(rpc_timeout, rx).await { + Ok(Ok(block)) => Ok(BlockRetrievalResponse::new( + BlockRetrievalStatus::SucceededWithTarget, + vec![block], + )), + Ok(Err(_)) => Err(anyhow!("self retrieval cancelled")), + Err(_) => Err(anyhow!("self retrieval timeout")), + }; (author, response) } .boxed(), From d225f1e6a377caffde4adb93bb177b1e466752b8 Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Mon, 8 Jul 2024 18:03:07 -0700 Subject: [PATCH 2/2] [consensus] unit test for block retrieval timeout --- consensus/src/network_tests.rs | 37 +++++++++++++++- consensus/src/round_manager_test.rs | 68 +++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 1 deletion(-) diff --git a/consensus/src/network_tests.rs b/consensus/src/network_tests.rs index 55bc30507cf82..ad26baccdf6f3 100644 --- a/consensus/src/network_tests.rs +++ b/consensus/src/network_tests.rs @@ -26,7 +26,7 @@ use aptos_network::{ PeerManagerRequestSender, }, protocols::{ - network::{NewNetworkEvents, SerializedRequest}, + network::{NewNetworkEvents, RpcError, SerializedRequest}, rpc::InboundRpcRequest, wire::handshake::v1::ProtocolIdSet, }, @@ -75,6 +75,8 @@ pub struct NetworkPlayground { outbound_msgs_tx: mpsc::Sender<(TwinId, PeerManagerRequest)>, /// NetworkPlayground reads all nodes' outbound messages through this queue. outbound_msgs_rx: mpsc::Receiver<(TwinId, PeerManagerRequest)>, + /// Allow test code to timeout RPC messages between peers. + timeout_config: Arc>, /// Allow test code to drop direct-send messages between peers. drop_config: Arc>, /// Allow test code to drop direct-send messages between peers per round. @@ -96,6 +98,7 @@ impl NetworkPlayground { node_consensus_txs: Arc::new(Mutex::new(HashMap::new())), outbound_msgs_tx, outbound_msgs_rx, + timeout_config: Arc::new(RwLock::new(TimeoutConfig::default())), drop_config: Arc::new(RwLock::new(DropConfig::default())), drop_config_round: DropConfigRound::default(), executor, @@ -122,6 +125,7 @@ impl NetworkPlayground { /// Rpc messages are immediately sent to the destination for handling, so /// they don't block. async fn start_node_outbound_handler( + timeout_config: Arc>, drop_config: Arc>, src_twin_id: TwinId, mut network_reqs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>, @@ -160,6 +164,14 @@ impl NetworkPlayground { None => continue, // drop rpc }; + if timeout_config + .read() + .is_message_timedout(&src_twin_id, dst_twin_id) + { + outbound_req.res_tx.send(Err(RpcError::TimedOut)).unwrap(); + continue; + } + let node_consensus_tx = node_consensus_txs.lock().get(dst_twin_id).unwrap().clone(); @@ -195,10 +207,12 @@ impl NetworkPlayground { ) { self.node_consensus_txs.lock().insert(twin_id, consensus_tx); self.drop_config.write().add_node(twin_id); + self.timeout_config.write().add_node(twin_id); self.extend_author_to_twin_ids(twin_id.author, twin_id); let fut1 = NetworkPlayground::start_node_outbound_handler( + Arc::clone(&self.timeout_config), Arc::clone(&self.drop_config), twin_id, network_reqs_rx, @@ -374,6 +388,10 @@ impl NetworkPlayground { ret } + pub fn timeout_config(&self) -> Arc> { + self.timeout_config.clone() + } + pub async fn start(mut self) { // Take the next queued message while let Some((src_twin_id, net_req)) = self.outbound_msgs_rx.next().await { @@ -453,6 +471,23 @@ impl DropConfig { } } +#[derive(Default)] +pub(crate) struct TimeoutConfig(HashMap>); + +impl TimeoutConfig { + pub fn is_message_timedout(&self, src: &TwinId, dst: &TwinId) -> bool { + self.0.get(src).map_or(false, |set| set.contains(dst)) + } + + pub fn timeout_message_for(&mut self, src: &TwinId, dst: &TwinId) -> bool { + self.0.entry(*src).or_default().insert(*dst) + } + + fn add_node(&mut self, src: TwinId) { + self.0.insert(src, HashSet::new()); + } +} + /// Table of per round message dropping rules #[derive(Default)] struct DropConfigRound(HashMap); diff --git a/consensus/src/round_manager_test.rs b/consensus/src/round_manager_test.rs index 5b4861c4b5d80..c5a4d3d1e23dd 100644 --- a/consensus/src/round_manager_test.rs +++ b/consensus/src/round_manager_test.rs @@ -1853,6 +1853,74 @@ fn block_retrieval_test() { }); } +#[test] +fn block_retrieval_timeout_test() { + let runtime = consensus_runtime(); + let mut playground = NetworkPlayground::new(runtime.handle().clone()); + let mut nodes = NodeSetup::create_nodes( + &mut playground, + runtime.handle().clone(), + 4, + Some(vec![0, 1]), + None, + None, + None, + None, + ); + let timeout_config = playground.timeout_config(); + runtime.spawn(playground.start()); + + for i in 0..4 { + info!("processing {}", i); + process_and_vote_on_proposal( + &runtime, + &mut nodes, + i as usize % 2, + &[3], + true, + None, + true, + i + 1, + i.saturating_sub(1), + 0, + ); + } + + timed_block_on(&runtime, async { + let mut behind_node = nodes.pop().unwrap(); + + for node in nodes.iter() { + timeout_config.write().timeout_message_for( + &TwinId { + id: behind_node.id, + author: behind_node.signer.author(), + }, + &TwinId { + id: node.id, + author: node.signer.author(), + }, + ); + } + + // Drain the queue on other nodes + for node in nodes.iter_mut() { + let _ = node.next_proposal().await; + } + + info!( + "Processing proposals for behind node {}", + behind_node.identity_desc() + ); + + let proposal_msg = behind_node.next_proposal().await; + behind_node + .round_manager + .process_proposal_msg(proposal_msg) + .await + .unwrap_err(); + }); +} + #[ignore] // TODO: turn this test back on once the flakes have resolved. #[test] pub fn forking_retrieval_test() {