diff --git a/CHANGELOG.md b/CHANGELOG.md index 0458fb07..969c7aba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.10.2 + +- Process command or socket result immediately and thereby no longer accessing + the socket after it returned an error. See [PR 138] for details. + +[PR 138]: https://github.com/libp2p/rust-yamux/pull/138 + # 0.10.1 - Update `parking_lot` dependency. See [PR 126]. diff --git a/Cargo.toml b/Cargo.toml index 0b53bbd1..7611049f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "yamux" -version = "0.10.1" +version = "0.10.2" authors = ["Parity Technologies "] license = "Apache-2.0 OR MIT" description = "Multiplexer over reliable, ordered connections" diff --git a/src/connection.rs b/src/connection.rs index c902835a..f61eaedd 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -105,11 +105,7 @@ use futures::{ stream::{Fuse, FusedStream}, }; use nohash_hasher::IntMap; -use std::{ - fmt, io, - sync::Arc, - task::{Context, Poll}, -}; +use std::{fmt, io, sync::Arc, task::Poll}; pub use control::Control; pub use stream::{Packet, State, Stream}; @@ -427,7 +423,7 @@ impl Connection { let mut num_terminated = 0; - let mut next_frame = if self.socket.is_terminated() { + let next_frame = if self.socket.is_terminated() { num_terminated += 1; Either::Left(future::pending()) } else { @@ -458,14 +454,14 @@ impl Connection { Either::Right(next_frame) }; - let mut next_stream_command = if self.stream_receiver.is_terminated() { + let next_stream_command = if self.stream_receiver.is_terminated() { num_terminated += 1; Either::Left(future::pending()) } else { Either::Right(self.stream_receiver.next()) }; - let mut next_control_command = if self.control_receiver.is_terminated() { + let next_control_command = if self.control_receiver.is_terminated() { num_terminated += 1; Either::Left(future::pending()) } else { @@ -477,29 +473,19 @@ impl Connection { return Err(ConnectionError::Closed); } - let next_item = future::poll_fn(move |cx: &mut Context| { - let a = next_stream_command.poll_unpin(cx); - let b = next_control_command.poll_unpin(cx); - let c = next_frame.poll_unpin(cx); - if a.is_pending() && b.is_pending() && c.is_pending() { - return Poll::Pending; - } - Poll::Ready((a, b, c)) - }); - - let (stream_command, control_command, frame) = next_item.await; - - if let Poll::Ready(cmd) = control_command { - self.on_control_command(cmd).await? - } - - if let Poll::Ready(cmd) = stream_command { - self.on_stream_command(cmd).await? - } - - if let Poll::Ready(frame) = frame { - if let Some(stream) = self.on_frame(frame.transpose().map_err(Into::into)).await? { - return Ok(Some(stream)); + let combined_future = future::select( + future::select(next_stream_command, next_control_command), + next_frame, + ); + match combined_future.await { + Either::Left((Either::Left((cmd, _)), _)) => self.on_stream_command(cmd).await?, + Either::Left((Either::Right((cmd, _)), _)) => self.on_control_command(cmd).await?, + Either::Right((frame, _)) => { + if let Some(stream) = + self.on_frame(frame.transpose().map_err(Into::into)).await? + { + return Ok(Some(stream)); + } } } }