Skip to content

Commit

Permalink
Merge of #6488
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Apr 13, 2023
2 parents 541981e + 25b68a7 commit 392b7cc
Showing 1 changed file with 48 additions and 17 deletions.
65 changes: 48 additions & 17 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -595,22 +595,45 @@ where
match self.state {
State::AwaitingRequest => {
trace!("awaiting client request or peer message");
// CORRECTNESS
// # Correctness
//
// Currently, select prefers the first future if multiple
// futures are ready.
// Currently, select prefers the first future if multiple futures are ready.
// We use this behaviour to prioritise messages on each individual peer
// connection in this order:
// - incoming messages from the remote peer, then
// - outgoing messages to the remote peer.
//
// The peer can starve client requests if it sends an
// uninterrupted series of messages. But this is unlikely in
// practice, due to network delays.
// This improves the performance of peer responses to Zebra requests, and new
// peer requests to Zebra's inbound service.
//
// If both futures are ready, there's no particular reason
// to prefer one over the other.
// `futures::StreamExt::next()` is cancel-safe:
// <https://docs.rs/tokio/latest/tokio/macro.select.html#cancellation-safety>
// This means that messages from the future that isn't selected stay in the stream,
// and they will be returned next time the future is checked.
//
// TODO: use `futures::select!`, which chooses a ready future
// at random, avoiding starvation
// (To use `select!`, we'll need to map the different
// results to a new enum types.)
// If an inbound peer message arrives at a ready peer that also has a pending
// request from Zebra, we want to process the peer's message first.
// If we process the Zebra request first:
// - we could misinterpret the inbound peer message as a response to the Zebra
// request, or
// - if the peer message is a request to Zebra, and we put the peer in the
// AwaitingResponse state, then we'll correctly ignore the simultaneous Zebra
// request. (Zebra services make multiple requests or retry, so this is ok.)
//
// # Security
//
// If a peer sends an uninterrupted series of messages, it will delay any new
// requests from Zebra to that individual peer. This is behaviour we want,
// because:
// - any responses to Zebra's requests to that peer would be slow or timeout,
// - the peer will eventually fail a Zebra keepalive check and get disconnected,
// - if there are too many inbound messages overall, the inbound service will
// return an overload error and the peer will be disconnected.
//
// Messages to other peers will continue to be processed concurrently. Some
// Zebra services might be temporarily delayed until the peer times out, if a
// request to that peer is sent by the service, and the service blocks until
// the request completes (or times out).
match future::select(peer_rx.next(), self.client_rx.next()).await {
Either::Left((None, _)) => {
self.fail_with(PeerError::ConnectionClosed);
Expand Down Expand Up @@ -701,13 +724,21 @@ where
.as_mut()
.expect("timeout must be set while awaiting response");

// CORRECTNESS
// # Security
//
// select() prefers the first future if multiple futures are ready.
//
// If multiple futures are ready, we want the priority for each individual
// connection to be:
// - cancellation, then
// - timeout, then
// - peer responses.
//
// Currently, select prefers the first future if multiple
// futures are ready.
// (Messages to other peers are processed concurrently.)
//
// If multiple futures are ready, we want the cancellation
// to take priority, then the timeout, then peer responses.
// This makes sure a peer can't block disconnection or timeouts by sending too
// many messages. It also avoids doing work to process messages after a
// connection has failed.
let cancel = future::select(tx.cancellation(), timer_ref);
match future::select(cancel, peer_rx.next())
.instrument(span.clone())
Expand Down

0 comments on commit 392b7cc

Please sign in to comment.