diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index df5bbba99c8..daf95fb8c91 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -352,6 +352,31 @@ where !matches!(self.state, HandlerState::Deactivated) } + // NOTE: This function gets polled to completion upon a connection close. + fn poll_close(&mut self, _: &mut Context<'_>) -> Poll> { + // Inform the network behaviour of any failed requests + + while let Some(substream_id) = self.outbound_substreams.keys().next().cloned() { + let outbound_info = self + .outbound_substreams + .remove(&substream_id) + .expect("The value must exist for a key"); + // If the state of the connection is closing, we do not need to report this case to + // the behaviour, as the connection has just closed non-gracefully + if matches!(outbound_info.state, OutboundSubstreamState::Closing(_)) { + continue; + } + + // Register this request as an RPC Error + return Poll::Ready(Some(HandlerEvent::Err(HandlerErr::Outbound { + error: RPCError::Disconnected, + proto: outbound_info.proto, + id: outbound_info.req_id, + }))); + } + Poll::Ready(None) + } + fn poll( &mut self, cx: &mut Context<'_>, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index f91a5b471ad..349a60e11c0 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -972,6 +972,12 @@ impl Network { .goodbye_peer(peer_id, reason, source); } + /// Hard (ungraceful) disconnect for testing purposes only + /// Use goodbye_peer for disconnections, do not use this function. + pub fn __hard_disconnect_testing_only(&mut self, peer_id: PeerId) { + let _ = self.swarm.disconnect_peer_id(peer_id); + } + /// Returns an iterator over all enr entries in the DHT. pub fn enr_entries(&self) -> Vec { self.discovery().table_entries_enr() diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index a60af4db3db..e2b72f86732 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -3,7 +3,7 @@ mod common; use common::Protocol; -use lighthouse_network::rpc::methods::*; +use lighthouse_network::rpc::{methods::*, RPCError}; use lighthouse_network::{rpc::max_rpc_size, NetworkEvent, ReportSource, Request, Response}; use slog::{debug, warn, Level}; use ssz::Encode; @@ -996,6 +996,96 @@ fn test_tcp_blocks_by_root_chunked_rpc_terminates_correctly() { }) } +#[test] +fn test_disconnect_triggers_rpc_error() { + // set up the logging. The level and enabled logging or not + let log_level = Level::Debug; + let enable_logging = false; + + let log = common::build_log(log_level, enable_logging); + let spec = E::default_spec(); + + let rt = Arc::new(Runtime::new().unwrap()); + // get sender/receiver + rt.block_on(async { + let (mut sender, mut receiver) = common::build_node_pair( + Arc::downgrade(&rt), + &log, + ForkName::Base, + &spec, + Protocol::Tcp, + ) + .await; + + // BlocksByRoot Request + let rpc_request = Request::BlocksByRoot(BlocksByRootRequest::new( + // Must have at least one root for the request to create a stream + vec![Hash256::from_low_u64_be(0)], + &spec, + )); + + // build the sender future + let sender_future = async { + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + // Send a STATUS message + debug!(log, "Sending RPC"); + sender.send_request(peer_id, 42, rpc_request.clone()); + } + NetworkEvent::RPCFailed { error, id: 42, .. } => match error { + RPCError::Disconnected => return, + other => panic!("received unexpected error {:?}", other), + }, + other => { + warn!(log, "Ignoring other event {:?}", other); + } + } + } + }; + + // determine messages to send (PeerId, RequestId). If some, indicates we still need to send + // messages + let mut sending_peer = None; + let receiver_future = async { + loop { + // this future either drives the sending/receiving or times out allowing messages to be + // sent in the timeout + match futures::future::select( + Box::pin(receiver.next_event()), + Box::pin(tokio::time::sleep(Duration::from_secs(1))), + ) + .await + { + futures::future::Either::Left((ev, _)) => match ev { + NetworkEvent::RequestReceived { peer_id, .. } => { + sending_peer = Some(peer_id); + } + other => { + warn!(log, "Ignoring other event {:?}", other); + } + }, + futures::future::Either::Right((_, _)) => {} // The timeout hit, send messages if required + } + + // if we need to send messages send them here. This will happen after a delay + if let Some(peer_id) = sending_peer.take() { + warn!(log, "Receiver got request, disconnecting peer"); + receiver.__hard_disconnect_testing_only(peer_id); + } + } + }; + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(30)) => { + panic!("Future timed out"); + } + } + }) +} + /// Establishes a pair of nodes and disconnects the pair based on the selected protocol via an RPC /// Goodbye message. fn goodbye_test(log_level: Level, enable_logging: bool, protocol: Protocol) {