Skip to content

Commit

Permalink
Allow 1 count block request to return 0 blocks (#5554)
Browse files Browse the repository at this point in the history
* Allow 1 count block request to return 0 blocks

* Address @pawanjay176 review
  • Loading branch information
dapplion authored Apr 12, 2024
1 parent 6bac5ce commit 5fdd3b3
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 72 deletions.
81 changes: 40 additions & 41 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ struct InboundInfo<E: EthSpec> {
/// Protocol of the original request we received from the peer.
protocol: Protocol,
/// Responses that the peer is still expecting from us.
remaining_chunks: u64,
max_remaining_chunks: u64,
/// Useful to timing how long each request took to process. Currently only used by
/// BlocksByRange.
request_start_time: Instant,
Expand All @@ -180,7 +180,7 @@ struct OutboundInfo<Id, E: EthSpec> {
/// Info over the protocol this substream is handling.
proto: Protocol,
/// Number of chunks to be seen from the peer's response.
remaining_chunks: Option<u64>,
max_remaining_chunks: Option<u64>,
/// `Id` as given by the application that sent the request.
req_id: Id,
}
Expand Down Expand Up @@ -471,7 +471,7 @@ where
// Process one more message if one exists.
if let Some(message) = info.pending_items.pop_front() {
// If this is the last chunk, terminate the stream.
let last_chunk = info.remaining_chunks <= 1;
let last_chunk = info.max_remaining_chunks <= 1;
let fut =
send_message_to_inbound_substream(substream, message, last_chunk)
.boxed();
Expand Down Expand Up @@ -537,7 +537,8 @@ where
{
// The substream is still active, decrement the remaining
// chunks expected.
info.remaining_chunks = info.remaining_chunks.saturating_sub(1);
info.max_remaining_chunks =
info.max_remaining_chunks.saturating_sub(1);

// If this substream has not ended, we reset the timer.
// Each chunk is allowed RESPONSE_TIMEOUT to be sent.
Expand All @@ -552,7 +553,7 @@ where
// Process one more message if one exists.
if let Some(message) = info.pending_items.pop_front() {
// If this is the last chunk, terminate the stream.
let last_chunk = info.remaining_chunks <= 1;
let last_chunk = info.max_remaining_chunks <= 1;
let fut = send_message_to_inbound_substream(
substream, message, last_chunk,
)
Expand Down Expand Up @@ -664,15 +665,19 @@ where
request,
} => match substream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(response))) => {
if request.expected_responses() > 1 && !response.close_after() {
if request.expect_exactly_one_response() || response.close_after() {
// either this is a single response request or this response closes the
// stream
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
} else {
let substream_entry = entry.get_mut();
let delay_key = &substream_entry.delay_key;
// chunks left after this one
let remaining_chunks = substream_entry
.remaining_chunks
let max_remaining_chunks = substream_entry
.max_remaining_chunks
.map(|count| count.saturating_sub(1))
.unwrap_or_else(|| 0);
if remaining_chunks == 0 {
if max_remaining_chunks == 0 {
// this is the last expected message, close the stream as all expected chunks have been received
substream_entry.state = OutboundSubstreamState::Closing(substream);
} else {
Expand All @@ -682,14 +687,10 @@ where
substream,
request,
};
substream_entry.remaining_chunks = Some(remaining_chunks);
substream_entry.max_remaining_chunks = Some(max_remaining_chunks);
self.outbound_substreams_delay
.reset(delay_key, self.resp_timeout);
}
} else {
// either this is a single response request or this response closes the
// stream
entry.get_mut().state = OutboundSubstreamState::Closing(substream);
}

// Check what type of response we got and report it accordingly
Expand Down Expand Up @@ -725,7 +726,16 @@ where
self.outbound_substreams_delay.remove(delay_key);
entry.remove_entry();
// notify the application error
if request.expected_responses() > 1 {
if request.expect_exactly_one_response() {
// return an error, stream should not have closed early.
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Err(HandlerErr::Outbound {
id: request_id,
proto: request.versioned_protocol().protocol(),
error: RPCError::IncompleteStream,
}),
));
} else {
// return an end of stream result
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Ok(RPCReceived::EndOfStream(
Expand All @@ -734,16 +744,6 @@ where
)),
));
}

// else we return an error, stream should not have closed early.
let outbound_err = HandlerErr::Outbound {
id: request_id,
proto: request.versioned_protocol().protocol(),
error: RPCError::IncompleteStream,
};
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerEvent::Err(outbound_err),
));
}
Poll::Pending => {
entry.get_mut().state =
Expand Down Expand Up @@ -880,10 +880,10 @@ where
}

let (req, substream) = substream;
let expected_responses = req.expected_responses();
let max_responses = req.max_responses();

// store requests that expect responses
if expected_responses > 0 {
if max_responses > 0 {
if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS {
// Store the stream and tag the output.
let delay_key = self
Expand All @@ -894,14 +894,13 @@ where
self.current_inbound_substream_id,
InboundInfo {
state: awaiting_stream,
pending_items: VecDeque::with_capacity(std::cmp::min(
expected_responses,
128,
) as usize),
pending_items: VecDeque::with_capacity(
std::cmp::min(max_responses, 128) as usize
),
delay_key: Some(delay_key),
protocol: req.versioned_protocol().protocol(),
request_start_time: Instant::now(),
remaining_chunks: expected_responses,
max_remaining_chunks: max_responses,
},
);
} else {
Expand Down Expand Up @@ -948,8 +947,14 @@ where
}

// add the stream to substreams if we expect a response, otherwise drop the stream.
let expected_responses = request.expected_responses();
if expected_responses > 0 {
let max_responses = request.max_responses();
if max_responses > 0 {
let max_remaining_chunks = if request.expect_exactly_one_response() {
// Currently enforced only for multiple responses
None
} else {
Some(max_responses)
};
// new outbound request. Store the stream and tag the output.
let delay_key = self
.outbound_substreams_delay
Expand All @@ -958,12 +963,6 @@ where
substream: Box::new(substream),
request,
};
let expected_responses = if expected_responses > 1 {
// Currently enforced only for multiple responses
Some(expected_responses)
} else {
None
};
if self
.outbound_substreams
.insert(
Expand All @@ -972,7 +971,7 @@ where
state: awaiting_stream,
delay_key,
proto,
remaining_chunks: expected_responses,
max_remaining_chunks,
req_id: id,
},
)
Expand Down
21 changes: 0 additions & 21 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,27 +483,6 @@ impl<E: EthSpec> RPCCodedResponse<E> {
RPCCodedResponse::Error(code, err)
}

/// Specifies which response allows for multiple chunks for the stream handler.
pub fn multiple_responses(&self) -> bool {
match self {
RPCCodedResponse::Success(resp) => match resp {
RPCResponse::Status(_) => false,
RPCResponse::BlocksByRange(_) => true,
RPCResponse::BlocksByRoot(_) => true,
RPCResponse::BlobsByRange(_) => true,
RPCResponse::BlobsByRoot(_) => true,
RPCResponse::Pong(_) => false,
RPCResponse::MetaData(_) => false,
RPCResponse::LightClientBootstrap(_) => false,
RPCResponse::LightClientOptimisticUpdate(_) => false,
RPCResponse::LightClientFinalityUpdate(_) => false,
},
RPCCodedResponse::Error(_, _) => true,
// Stream terminations are part of responses that have chunks
RPCCodedResponse::StreamTermination(_) => true,
}
}

/// Returns true if this response always terminates the stream.
pub fn close_after(&self) -> bool {
!matches!(self, RPCCodedResponse::Success(_))
Expand Down
17 changes: 15 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ impl<E: EthSpec> OutboundRequest<E> {
}
/* These functions are used in the handler for stream management */

/// Number of responses expected for this request.
pub fn expected_responses(&self) -> u64 {
/// Maximum number of responses expected for this request.
pub fn max_responses(&self) -> u64 {
match self {
OutboundRequest::Status(_) => 1,
OutboundRequest::Goodbye(_) => 0,
Expand All @@ -105,6 +105,19 @@ impl<E: EthSpec> OutboundRequest<E> {
}
}

pub fn expect_exactly_one_response(&self) -> bool {
match self {
OutboundRequest::Status(_) => true,
OutboundRequest::Goodbye(_) => false,
OutboundRequest::BlocksByRange(_) => false,
OutboundRequest::BlocksByRoot(_) => false,
OutboundRequest::BlobsByRange(_) => false,
OutboundRequest::BlobsByRoot(_) => false,
OutboundRequest::Ping(_) => true,
OutboundRequest::MetaData(_) => true,
}
}

/// Gives the corresponding `SupportedProtocol` to this request.
pub fn versioned_protocol(&self) -> SupportedProtocol {
match self {
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -654,8 +654,8 @@ pub enum InboundRequest<E: EthSpec> {
impl<E: EthSpec> InboundRequest<E> {
/* These functions are used in the handler for stream management */

/// Number of responses expected for this request.
pub fn expected_responses(&self) -> u64 {
/// Maximum number of responses expected for this request.
pub fn max_responses(&self) -> u64 {
match self {
InboundRequest::Status(_) => 1,
InboundRequest::Goodbye(_) => 0,
Expand Down
12 changes: 6 additions & 6 deletions beacon_node/lighthouse_network/src/rpc/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,16 +228,16 @@ impl RPCRateLimiterBuilder {

pub trait RateLimiterItem {
fn protocol(&self) -> Protocol;
fn expected_responses(&self) -> u64;
fn max_responses(&self) -> u64;
}

impl<E: EthSpec> RateLimiterItem for super::InboundRequest<E> {
fn protocol(&self) -> Protocol {
self.versioned_protocol().protocol()
}

fn expected_responses(&self) -> u64 {
self.expected_responses()
fn max_responses(&self) -> u64 {
self.max_responses()
}
}

Expand All @@ -246,8 +246,8 @@ impl<E: EthSpec> RateLimiterItem for super::OutboundRequest<E> {
self.versioned_protocol().protocol()
}

fn expected_responses(&self) -> u64 {
self.expected_responses()
fn max_responses(&self) -> u64 {
self.max_responses()
}
}
impl RPCRateLimiter {
Expand Down Expand Up @@ -299,7 +299,7 @@ impl RPCRateLimiter {
request: &Item,
) -> Result<(), RateLimitedErr> {
let time_since_start = self.init_time.elapsed();
let tokens = request.expected_responses().max(1);
let tokens = request.max_responses().max(1);

let check =
|limiter: &mut Limiter<PeerId>| limiter.allows(time_since_start, peer_id, tokens);
Expand Down

0 comments on commit 5fdd3b3

Please sign in to comment.