Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(webocket): Avoid panic when polling quicksink after errors #5482

Merged
merged 9 commits into from
Jul 9, 2024
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ libp2p-upnp = { version = "0.2.2", path = "protocols/upnp" }
libp2p-webrtc = { version = "0.7.1-alpha", path = "transports/webrtc" }
libp2p-webrtc-utils = { version = "0.2.1", path = "misc/webrtc-utils" }
libp2p-webrtc-websys = { version = "0.3.0-alpha", path = "transports/webrtc-websys" }
libp2p-websocket = { version = "0.43.1", path = "transports/websocket" }
libp2p-websocket = { version = "0.43.2", path = "transports/websocket" }
libp2p-websocket-websys = { version = "0.3.2", path = "transports/websocket-websys" }
libp2p-webtransport-websys = { version = "0.3.0", path = "transports/webtransport-websys" }
libp2p-yamux = { version = "0.45.2", path = "muxers/yamux" }
Expand Down
6 changes: 6 additions & 0 deletions transports/websocket/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.43.2

- fix: Avoid websocket panic on polling after errors. See [PR 5482].

[PR 5482]: https://github.com/libp2p/rust-libp2p/pull/5482

## 0.43.1

## 0.43.0
Expand Down
3 changes: 2 additions & 1 deletion transports/websocket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-websocket"
edition = "2021"
rust-version = { workspace = true }
description = "WebSocket transport for libp2p"
version = "0.43.1"
version = "0.43.2"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand All @@ -21,6 +21,7 @@ pin-project-lite = "0.2.14"
rw-stream-sink = { workspace = true }
soketto = "0.8.0"
tracing = { workspace = true }
thiserror = "1.0.61"
url = "2.5"
webpki-roots = "0.25"

Expand Down
2 changes: 1 addition & 1 deletion transports/websocket/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ fn location_to_multiaddr<T>(location: &str) -> Result<Multiaddr, Error<T>> {
/// The websocket connection.
pub struct Connection<T> {
receiver: BoxStream<'static, Result<Incoming, connection::Error>>,
sender: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>,
sender: Pin<Box<dyn Sink<OutgoingData, Error = quicksink::Error<connection::Error>> + Send>>,
_marker: std::marker::PhantomData<T>,
}

Expand Down
72 changes: 49 additions & 23 deletions transports/websocket/src/quicksink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,6 @@
// Ok::<_, io::Error>(stdout)
// });
// ```
//
// # Panics
//
// - If any of the [`Sink`] methods produce an error, the sink transitions
// to a failure state and none of its methods must be called afterwards or
// else a panic will occur.
// - If [`Sink::poll_close`] has been called, no other sink method must be
// called afterwards or else a panic will be caused.

use futures::{ready, sink::Sink};
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -102,6 +94,15 @@ enum State {
Failed,
}

/// Errors the `Sink` may return.
#[derive(Debug, thiserror::Error)]
pub(crate) enum Error<E> {
#[error("Error while sending over the sink, {0}")]
Send(E),
#[error("The Sink has closed")]
Closed,
}

pin_project! {
/// `SinkImpl` implements the `Sink` trait.
#[derive(Debug)]
Expand All @@ -119,7 +120,7 @@ where
F: FnMut(S, Action<A>) -> T,
T: Future<Output = Result<S, E>>,
{
type Error = E;
type Error = Error<E>;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
Expand All @@ -135,28 +136,27 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
Poll::Ready(Err(e))
Poll::Ready(Err(Error::Send(e)))
}
}
}
State::Closing => match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) {
Ok(_) => {
this.future.set(None);
*this.state = State::Closed;
panic!("SinkImpl::poll_ready called on a closing sink.")
Poll::Ready(Err(Error::Closed))
}
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
Poll::Ready(Err(e))
Poll::Ready(Err(Error::Send(e)))
}
},
State::Empty => {
assert!(this.param.is_some());
Poll::Ready(Ok(()))
}
State::Closed => panic!("SinkImpl::poll_ready called on a closed sink."),
State::Failed => panic!("SinkImpl::poll_ready called after error."),
State::Closed | State::Failed => Poll::Ready(Err(Error::Closed)),
}
}

Expand Down Expand Up @@ -193,7 +193,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
},
State::Flushing => {
Expand All @@ -207,7 +207,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
}
}
Expand All @@ -221,11 +221,10 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
},
State::Closed => return Poll::Ready(Ok(())),
State::Failed => panic!("SinkImpl::poll_flush called after error."),
State::Closed | State::Failed => return Poll::Ready(Err(Error::Closed)),
}
}
}
Expand Down Expand Up @@ -253,7 +252,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
},
State::Flushing => {
Expand All @@ -266,7 +265,7 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
}
}
Expand All @@ -280,11 +279,11 @@ where
Err(e) => {
this.future.set(None);
*this.state = State::Failed;
return Poll::Ready(Err(e));
return Poll::Ready(Err(Error::Send(e)));
}
},
State::Closed => return Poll::Ready(Ok(())),
State::Failed => panic!("SinkImpl::poll_closed called after error."),
State::Failed => return Poll::Ready(Err(Error::Closed)),
}
}
}
Expand Down Expand Up @@ -347,4 +346,31 @@ mod tests {
assert_eq!(&expected[..], &actual[..])
});
}

#[test]
fn error_does_not_panic() {
task::block_on(async {
let sink = make_sink(io::stdout(), |mut _stdout, _action| async move {
Err(io::Error::new(io::ErrorKind::Other, "oh no"))
});

futures::pin_mut!(sink);

let result = sink.send("hello").await;
match result {
Err(crate::quicksink::Error::Send(e)) => {
assert_eq!(e.kind(), io::ErrorKind::Other);
assert_eq!(e.to_string(), "oh no")
}
_ => panic!("unexpected result: {:?}", result),
};

// Call send again, expect not to panic.
let result = sink.send("hello").await;
match result {
Err(crate::quicksink::Error::Closed) => {}
_ => panic!("unexpected result: {:?}", result),
};
})
}
}
Loading