Skip to content

Commit

Permalink
src/connection: Process command or socket result immediately (#138)
Browse files Browse the repository at this point in the history
> The `frame` future might be _ready_ with an `Error` from the underlying
socket (i.e. here `libp2p-websocket`). Though given that the result of the
`control_command` `Future` is handled first, `on_control_command` is called
despite `frame` having returned an `Error`. `on_control_command` itself may try
to write to the underlying socket, which will panic given that the socket
returned an error earlier via the `frame` `Future`.

With this patch, once any of `next_stream_command`, `next_control_command` or
`next_frame` `Future` is ready, the result is processed right away, without
additionally polling the remaining pending `Future`s, thus surfacing errors as
early as possible.

See libp2p/rust-libp2p#2598 for details.
  • Loading branch information
mxinden authored Aug 5, 2022
1 parent 233199d commit e6e10f8
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 32 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# 0.10.2

- Process command or socket result immediately and thereby no longer accessing
the socket after it returned an error. See [PR 138] for details.

[PR 138]: https://github.com/libp2p/rust-yamux/pull/138

# 0.10.1

- Update `parking_lot` dependency. See [PR 126].
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yamux"
version = "0.10.1"
version = "0.10.2"
authors = ["Parity Technologies <[email protected]>"]
license = "Apache-2.0 OR MIT"
description = "Multiplexer over reliable, ordered connections"
Expand Down
48 changes: 17 additions & 31 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,7 @@ use futures::{
stream::{Fuse, FusedStream},
};
use nohash_hasher::IntMap;
use std::{
fmt, io,
sync::Arc,
task::{Context, Poll},
};
use std::{fmt, io, sync::Arc, task::Poll};

pub use control::Control;
pub use stream::{Packet, State, Stream};
Expand Down Expand Up @@ -427,7 +423,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {

let mut num_terminated = 0;

let mut next_frame = if self.socket.is_terminated() {
let next_frame = if self.socket.is_terminated() {
num_terminated += 1;
Either::Left(future::pending())
} else {
Expand Down Expand Up @@ -458,14 +454,14 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
Either::Right(next_frame)
};

let mut next_stream_command = if self.stream_receiver.is_terminated() {
let next_stream_command = if self.stream_receiver.is_terminated() {
num_terminated += 1;
Either::Left(future::pending())
} else {
Either::Right(self.stream_receiver.next())
};

let mut next_control_command = if self.control_receiver.is_terminated() {
let next_control_command = if self.control_receiver.is_terminated() {
num_terminated += 1;
Either::Left(future::pending())
} else {
Expand All @@ -477,29 +473,19 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Connection<T> {
return Err(ConnectionError::Closed);
}

let next_item = future::poll_fn(move |cx: &mut Context| {
let a = next_stream_command.poll_unpin(cx);
let b = next_control_command.poll_unpin(cx);
let c = next_frame.poll_unpin(cx);
if a.is_pending() && b.is_pending() && c.is_pending() {
return Poll::Pending;
}
Poll::Ready((a, b, c))
});

let (stream_command, control_command, frame) = next_item.await;

if let Poll::Ready(cmd) = control_command {
self.on_control_command(cmd).await?
}

if let Poll::Ready(cmd) = stream_command {
self.on_stream_command(cmd).await?
}

if let Poll::Ready(frame) = frame {
if let Some(stream) = self.on_frame(frame.transpose().map_err(Into::into)).await? {
return Ok(Some(stream));
let combined_future = future::select(
future::select(next_stream_command, next_control_command),
next_frame,
);
match combined_future.await {
Either::Left((Either::Left((cmd, _)), _)) => self.on_stream_command(cmd).await?,
Either::Left((Either::Right((cmd, _)), _)) => self.on_control_command(cmd).await?,
Either::Right((frame, _)) => {
if let Some(stream) =
self.on_frame(frame.transpose().map_err(Into::into)).await?
{
return Ok(Some(stream));
}
}
}
}
Expand Down

0 comments on commit e6e10f8

Please sign in to comment.