From 2562a400291cc0b0915b9cf6d37fce1cc3d4f836 Mon Sep 17 00:00:00 2001 From: Daniil Date: Wed, 13 Apr 2022 10:02:36 +0300 Subject: [PATCH] [fix] #2005: Fix `Client::listen_for_events()` not closing WebSocket stream (#2095) --- cli/src/torii/routing.rs | 11 ++++++----- client/src/client.rs | 17 +++++++++++++++++ core/src/event.rs | 8 ++++++++ 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/cli/src/torii/routing.rs b/cli/src/torii/routing.rs index 30c78a3ed6a..02cb5d63d10 100644 --- a/cli/src/torii/routing.rs +++ b/cli/src/torii/routing.rs @@ -233,18 +233,19 @@ mod subscription { /// There should be a [`warp::filters::ws::Message::close()`] message to end subscription #[iroha_futures::telemetry_future] pub async fn handle_subscription(events: EventsSender, stream: WebSocket) -> eyre::Result<()> { - match subscribe_forever(events, stream).await { - Ok(()) | Err(Error::CloseMessage) => Ok(()), + let mut consumer = Consumer::new(stream).await?; + + match subscribe_forever(events, &mut consumer).await { + Ok(()) | Err(Error::CloseMessage) => consumer.close_stream().await.map_err(Into::into), Err(err) => Err(err.into()), } } - /// Make endless `stream` subscription for `events` + /// Make endless `consumer` subscription for `events` /// /// Ideally should return `Result` cause it either runs forever either returns `Err` variant - async fn subscribe_forever(events: EventsSender, stream: WebSocket) -> Result<()> { + async fn subscribe_forever(events: EventsSender, consumer: &mut Consumer) -> Result<()> { let mut events = events.subscribe(); - let mut consumer = Consumer::new(stream).await?; loop { tokio::select! { diff --git a/client/src/client.rs b/client/src/client.rs index f50fced0da3..0194d2a7489 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -612,6 +612,23 @@ impl Iterator for EventIterator { } } +impl Drop for EventIterator { + fn drop(&mut self) { + let mut close = || -> eyre::Result<()> { + self.stream.close(None)?; + let mes = self.stream.read_message()?; + if !mes.is_close() { + return Err(eyre!( + "Server hasn't sent `Close` message for websocket handshake" + )); + } + Ok(()) + }; + + let _ = close().map_err(|e| warn!(%e)); + } +} + impl Debug for Client { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("Client") diff --git a/core/src/event.rs b/core/src/event.rs index 8a1350c6949..1719e562c5f 100644 --- a/core/src/event.rs +++ b/core/src/event.rs @@ -111,4 +111,12 @@ impl Consumer { } Err(Error::CantReceiveMessage) } + + /// Close stream. See [`WebSocket::close()`] + /// + /// # Errors + /// Throws up [`WebSocket::close()`] errors + pub async fn close_stream(self) -> Result<()> { + self.stream.close().await.map_err(Into::into) + } }