diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index cb537f23590..df5bbba99c8 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -163,7 +163,7 @@ struct InboundInfo { /// Protocol of the original request we received from the peer. protocol: Protocol, /// Responses that the peer is still expecting from us. - remaining_chunks: u64, + max_remaining_chunks: u64, /// Useful to timing how long each request took to process. Currently only used by /// BlocksByRange. request_start_time: Instant, @@ -180,7 +180,7 @@ struct OutboundInfo { /// Info over the protocol this substream is handling. proto: Protocol, /// Number of chunks to be seen from the peer's response. - remaining_chunks: Option, + max_remaining_chunks: Option, /// `Id` as given by the application that sent the request. req_id: Id, } @@ -471,7 +471,7 @@ where // Process one more message if one exists. if let Some(message) = info.pending_items.pop_front() { // If this is the last chunk, terminate the stream. - let last_chunk = info.remaining_chunks <= 1; + let last_chunk = info.max_remaining_chunks <= 1; let fut = send_message_to_inbound_substream(substream, message, last_chunk) .boxed(); @@ -537,7 +537,8 @@ where { // The substream is still active, decrement the remaining // chunks expected. - info.remaining_chunks = info.remaining_chunks.saturating_sub(1); + info.max_remaining_chunks = + info.max_remaining_chunks.saturating_sub(1); // If this substream has not ended, we reset the timer. // Each chunk is allowed RESPONSE_TIMEOUT to be sent. @@ -552,7 +553,7 @@ where // Process one more message if one exists. if let Some(message) = info.pending_items.pop_front() { // If this is the last chunk, terminate the stream. - let last_chunk = info.remaining_chunks <= 1; + let last_chunk = info.max_remaining_chunks <= 1; let fut = send_message_to_inbound_substream( substream, message, last_chunk, ) @@ -664,15 +665,19 @@ where request, } => match substream.poll_next_unpin(cx) { Poll::Ready(Some(Ok(response))) => { - if request.expected_responses() > 1 && !response.close_after() { + if request.expect_exactly_one_response() || response.close_after() { + // either this is a single response request or this response closes the + // stream + entry.get_mut().state = OutboundSubstreamState::Closing(substream); + } else { let substream_entry = entry.get_mut(); let delay_key = &substream_entry.delay_key; // chunks left after this one - let remaining_chunks = substream_entry - .remaining_chunks + let max_remaining_chunks = substream_entry + .max_remaining_chunks .map(|count| count.saturating_sub(1)) .unwrap_or_else(|| 0); - if remaining_chunks == 0 { + if max_remaining_chunks == 0 { // this is the last expected message, close the stream as all expected chunks have been received substream_entry.state = OutboundSubstreamState::Closing(substream); } else { @@ -682,14 +687,10 @@ where substream, request, }; - substream_entry.remaining_chunks = Some(remaining_chunks); + substream_entry.max_remaining_chunks = Some(max_remaining_chunks); self.outbound_substreams_delay .reset(delay_key, self.resp_timeout); } - } else { - // either this is a single response request or this response closes the - // stream - entry.get_mut().state = OutboundSubstreamState::Closing(substream); } // Check what type of response we got and report it accordingly @@ -725,7 +726,16 @@ where self.outbound_substreams_delay.remove(delay_key); entry.remove_entry(); // notify the application error - if request.expected_responses() > 1 { + if request.expect_exactly_one_response() { + // return an error, stream should not have closed early. + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( + HandlerEvent::Err(HandlerErr::Outbound { + id: request_id, + proto: request.versioned_protocol().protocol(), + error: RPCError::IncompleteStream, + }), + )); + } else { // return an end of stream result return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( HandlerEvent::Ok(RPCReceived::EndOfStream( @@ -734,16 +744,6 @@ where )), )); } - - // else we return an error, stream should not have closed early. - let outbound_err = HandlerErr::Outbound { - id: request_id, - proto: request.versioned_protocol().protocol(), - error: RPCError::IncompleteStream, - }; - return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( - HandlerEvent::Err(outbound_err), - )); } Poll::Pending => { entry.get_mut().state = @@ -880,10 +880,10 @@ where } let (req, substream) = substream; - let expected_responses = req.expected_responses(); + let max_responses = req.max_responses(); // store requests that expect responses - if expected_responses > 0 { + if max_responses > 0 { if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS { // Store the stream and tag the output. let delay_key = self @@ -894,14 +894,13 @@ where self.current_inbound_substream_id, InboundInfo { state: awaiting_stream, - pending_items: VecDeque::with_capacity(std::cmp::min( - expected_responses, - 128, - ) as usize), + pending_items: VecDeque::with_capacity( + std::cmp::min(max_responses, 128) as usize + ), delay_key: Some(delay_key), protocol: req.versioned_protocol().protocol(), request_start_time: Instant::now(), - remaining_chunks: expected_responses, + max_remaining_chunks: max_responses, }, ); } else { @@ -948,8 +947,14 @@ where } // add the stream to substreams if we expect a response, otherwise drop the stream. - let expected_responses = request.expected_responses(); - if expected_responses > 0 { + let max_responses = request.max_responses(); + if max_responses > 0 { + let max_remaining_chunks = if request.expect_exactly_one_response() { + // Currently enforced only for multiple responses + None + } else { + Some(max_responses) + }; // new outbound request. Store the stream and tag the output. let delay_key = self .outbound_substreams_delay @@ -958,12 +963,6 @@ where substream: Box::new(substream), request, }; - let expected_responses = if expected_responses > 1 { - // Currently enforced only for multiple responses - Some(expected_responses) - } else { - None - }; if self .outbound_substreams .insert( @@ -972,7 +971,7 @@ where state: awaiting_stream, delay_key, proto, - remaining_chunks: expected_responses, + max_remaining_chunks, req_id: id, }, ) diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 67eea09ea77..1b0486ff771 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -483,27 +483,6 @@ impl RPCCodedResponse { RPCCodedResponse::Error(code, err) } - /// Specifies which response allows for multiple chunks for the stream handler. - pub fn multiple_responses(&self) -> bool { - match self { - RPCCodedResponse::Success(resp) => match resp { - RPCResponse::Status(_) => false, - RPCResponse::BlocksByRange(_) => true, - RPCResponse::BlocksByRoot(_) => true, - RPCResponse::BlobsByRange(_) => true, - RPCResponse::BlobsByRoot(_) => true, - RPCResponse::Pong(_) => false, - RPCResponse::MetaData(_) => false, - RPCResponse::LightClientBootstrap(_) => false, - RPCResponse::LightClientOptimisticUpdate(_) => false, - RPCResponse::LightClientFinalityUpdate(_) => false, - }, - RPCCodedResponse::Error(_, _) => true, - // Stream terminations are part of responses that have chunks - RPCCodedResponse::StreamTermination(_) => true, - } - } - /// Returns true if this response always terminates the stream. pub fn close_after(&self) -> bool { !matches!(self, RPCCodedResponse::Success(_)) diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index 09ef6185957..8ea7b84bc95 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -91,8 +91,8 @@ impl OutboundRequest { } /* These functions are used in the handler for stream management */ - /// Number of responses expected for this request. - pub fn expected_responses(&self) -> u64 { + /// Maximum number of responses expected for this request. + pub fn max_responses(&self) -> u64 { match self { OutboundRequest::Status(_) => 1, OutboundRequest::Goodbye(_) => 0, @@ -105,6 +105,19 @@ impl OutboundRequest { } } + pub fn expect_exactly_one_response(&self) -> bool { + match self { + OutboundRequest::Status(_) => true, + OutboundRequest::Goodbye(_) => false, + OutboundRequest::BlocksByRange(_) => false, + OutboundRequest::BlocksByRoot(_) => false, + OutboundRequest::BlobsByRange(_) => false, + OutboundRequest::BlobsByRoot(_) => false, + OutboundRequest::Ping(_) => true, + OutboundRequest::MetaData(_) => true, + } + } + /// Gives the corresponding `SupportedProtocol` to this request. pub fn versioned_protocol(&self) -> SupportedProtocol { match self { diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 6ff72f658a8..f65586087c2 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -654,8 +654,8 @@ pub enum InboundRequest { impl InboundRequest { /* These functions are used in the handler for stream management */ - /// Number of responses expected for this request. - pub fn expected_responses(&self) -> u64 { + /// Maximum number of responses expected for this request. + pub fn max_responses(&self) -> u64 { match self { InboundRequest::Status(_) => 1, InboundRequest::Goodbye(_) => 0, diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 801a4af54b3..b304eb546da 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -228,7 +228,7 @@ impl RPCRateLimiterBuilder { pub trait RateLimiterItem { fn protocol(&self) -> Protocol; - fn expected_responses(&self) -> u64; + fn max_responses(&self) -> u64; } impl RateLimiterItem for super::InboundRequest { @@ -236,8 +236,8 @@ impl RateLimiterItem for super::InboundRequest { self.versioned_protocol().protocol() } - fn expected_responses(&self) -> u64 { - self.expected_responses() + fn max_responses(&self) -> u64 { + self.max_responses() } } @@ -246,8 +246,8 @@ impl RateLimiterItem for super::OutboundRequest { self.versioned_protocol().protocol() } - fn expected_responses(&self) -> u64 { - self.expected_responses() + fn max_responses(&self) -> u64 { + self.max_responses() } } impl RPCRateLimiter { @@ -299,7 +299,7 @@ impl RPCRateLimiter { request: &Item, ) -> Result<(), RateLimitedErr> { let time_since_start = self.init_time.elapsed(); - let tokens = request.expected_responses().max(1); + let tokens = request.max_responses().max(1); let check = |limiter: &mut Limiter| limiter.allows(time_since_start, peer_id, tokens);