Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick][1.16][consensus] fix edge case of block retrieval #13903

Merged
merged 2 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
pipeline::execution_client::TExecutionClient,
};
use anyhow::{bail, Context};
use anyhow::{anyhow, bail, Context};
use aptos_consensus_types::{
block::Block,
block_retrieval::{
Expand All @@ -47,7 +47,7 @@ use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use futures_channel::oneshot;
use rand::{prelude::*, Rng};
use std::{clone::Clone, cmp::min, sync::Arc, time::Duration};
use tokio::time;
use tokio::{time, time::timeout};

#[derive(Debug, PartialEq, Eq)]
/// Whether we need to do block retrieval if we want to insert a Quorum Cert.
Expand Down Expand Up @@ -568,15 +568,14 @@ impl BlockRetriever {
let author = self.network.author();
futures.push(
async move {
let response = rx
.await
.map(|block| {
BlockRetrievalResponse::new(
BlockRetrievalStatus::SucceededWithTarget,
vec![block],
)
})
.map_err(|_| anyhow::anyhow!("self retrieval failed"));
let response = match timeout(rpc_timeout, rx).await {
Ok(Ok(block)) => Ok(BlockRetrievalResponse::new(
BlockRetrievalStatus::SucceededWithTarget,
vec![block],
)),
Ok(Err(_)) => Err(anyhow!("self retrieval cancelled")),
Err(_) => Err(anyhow!("self retrieval timeout")),
};
(author, response)
}
.boxed(),
Expand Down
37 changes: 36 additions & 1 deletion consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use aptos_network::{
PeerManagerRequestSender,
},
protocols::{
network::{NewNetworkEvents, SerializedRequest},
network::{NewNetworkEvents, RpcError, SerializedRequest},
rpc::InboundRpcRequest,
wire::handshake::v1::ProtocolIdSet,
},
Expand Down Expand Up @@ -75,6 +75,8 @@ pub struct NetworkPlayground {
outbound_msgs_tx: mpsc::Sender<(TwinId, PeerManagerRequest)>,
/// NetworkPlayground reads all nodes' outbound messages through this queue.
outbound_msgs_rx: mpsc::Receiver<(TwinId, PeerManagerRequest)>,
/// Allow test code to timeout RPC messages between peers.
timeout_config: Arc<RwLock<TimeoutConfig>>,
/// Allow test code to drop direct-send messages between peers.
drop_config: Arc<RwLock<DropConfig>>,
/// Allow test code to drop direct-send messages between peers per round.
Expand All @@ -96,6 +98,7 @@ impl NetworkPlayground {
node_consensus_txs: Arc::new(Mutex::new(HashMap::new())),
outbound_msgs_tx,
outbound_msgs_rx,
timeout_config: Arc::new(RwLock::new(TimeoutConfig::default())),
drop_config: Arc::new(RwLock::new(DropConfig::default())),
drop_config_round: DropConfigRound::default(),
executor,
Expand All @@ -122,6 +125,7 @@ impl NetworkPlayground {
/// Rpc messages are immediately sent to the destination for handling, so
/// they don't block.
async fn start_node_outbound_handler(
timeout_config: Arc<RwLock<TimeoutConfig>>,
drop_config: Arc<RwLock<DropConfig>>,
src_twin_id: TwinId,
mut network_reqs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
Expand Down Expand Up @@ -160,6 +164,14 @@ impl NetworkPlayground {
None => continue, // drop rpc
};

if timeout_config
.read()
.is_message_timedout(&src_twin_id, dst_twin_id)
{
outbound_req.res_tx.send(Err(RpcError::TimedOut)).unwrap();
continue;
}

let node_consensus_tx =
node_consensus_txs.lock().get(dst_twin_id).unwrap().clone();

Expand Down Expand Up @@ -195,10 +207,12 @@ impl NetworkPlayground {
) {
self.node_consensus_txs.lock().insert(twin_id, consensus_tx);
self.drop_config.write().add_node(twin_id);
self.timeout_config.write().add_node(twin_id);

self.extend_author_to_twin_ids(twin_id.author, twin_id);

let fut1 = NetworkPlayground::start_node_outbound_handler(
Arc::clone(&self.timeout_config),
Arc::clone(&self.drop_config),
twin_id,
network_reqs_rx,
Expand Down Expand Up @@ -374,6 +388,10 @@ impl NetworkPlayground {
ret
}

pub fn timeout_config(&self) -> Arc<RwLock<TimeoutConfig>> {
self.timeout_config.clone()
}

pub async fn start(mut self) {
// Take the next queued message
while let Some((src_twin_id, net_req)) = self.outbound_msgs_rx.next().await {
Expand Down Expand Up @@ -453,6 +471,23 @@ impl DropConfig {
}
}

#[derive(Default)]
pub(crate) struct TimeoutConfig(HashMap<TwinId, HashSet<TwinId>>);

impl TimeoutConfig {
pub fn is_message_timedout(&self, src: &TwinId, dst: &TwinId) -> bool {
self.0.get(src).map_or(false, |set| set.contains(dst))
}

pub fn timeout_message_for(&mut self, src: &TwinId, dst: &TwinId) -> bool {
self.0.entry(*src).or_default().insert(*dst)
}

fn add_node(&mut self, src: TwinId) {
self.0.insert(src, HashSet::new());
}
}

/// Table of per round message dropping rules
#[derive(Default)]
struct DropConfigRound(HashMap<u64, DropConfig>);
Expand Down
68 changes: 68 additions & 0 deletions consensus/src/round_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1853,6 +1853,74 @@ fn block_retrieval_test() {
});
}

#[test]
fn block_retrieval_timeout_test() {
let runtime = consensus_runtime();
let mut playground = NetworkPlayground::new(runtime.handle().clone());
let mut nodes = NodeSetup::create_nodes(
&mut playground,
runtime.handle().clone(),
4,
Some(vec![0, 1]),
None,
None,
None,
None,
);
let timeout_config = playground.timeout_config();
runtime.spawn(playground.start());

for i in 0..4 {
info!("processing {}", i);
process_and_vote_on_proposal(
&runtime,
&mut nodes,
i as usize % 2,
&[3],
true,
None,
true,
i + 1,
i.saturating_sub(1),
0,
);
}

timed_block_on(&runtime, async {
let mut behind_node = nodes.pop().unwrap();

for node in nodes.iter() {
timeout_config.write().timeout_message_for(
&TwinId {
id: behind_node.id,
author: behind_node.signer.author(),
},
&TwinId {
id: node.id,
author: node.signer.author(),
},
);
}

// Drain the queue on other nodes
for node in nodes.iter_mut() {
let _ = node.next_proposal().await;
}

info!(
"Processing proposals for behind node {}",
behind_node.identity_desc()
);

let proposal_msg = behind_node.next_proposal().await;
behind_node
.round_manager
.process_proposal_msg(proposal_msg)
.await
.unwrap_err();
});
}

#[ignore] // TODO: turn this test back on once the flakes have resolved.
#[test]
pub fn forking_retrieval_test() {
Expand Down
Loading