diff --git a/cli/src/torii/routing.rs b/cli/src/torii/routing.rs index 30c78a3ed6a..02cb5d63d10 100644 --- a/cli/src/torii/routing.rs +++ b/cli/src/torii/routing.rs @@ -233,18 +233,19 @@ mod subscription { /// There should be a [`warp::filters::ws::Message::close()`] message to end subscription #[iroha_futures::telemetry_future] pub async fn handle_subscription(events: EventsSender, stream: WebSocket) -> eyre::Result<()> { - match subscribe_forever(events, stream).await { - Ok(()) | Err(Error::CloseMessage) => Ok(()), + let mut consumer = Consumer::new(stream).await?; + + match subscribe_forever(events, &mut consumer).await { + Ok(()) | Err(Error::CloseMessage) => consumer.close_stream().await.map_err(Into::into), Err(err) => Err(err.into()), } } - /// Make endless `stream` subscription for `events` + /// Make endless `consumer` subscription for `events` /// /// Ideally should return `Result` cause it either runs forever either returns `Err` variant - async fn subscribe_forever(events: EventsSender, stream: WebSocket) -> Result<()> { + async fn subscribe_forever(events: EventsSender, consumer: &mut Consumer) -> Result<()> { let mut events = events.subscribe(); - let mut consumer = Consumer::new(stream).await?; loop { tokio::select! { diff --git a/client/src/client.rs b/client/src/client.rs index f50fced0da3..0194d2a7489 100644 --- a/client/src/client.rs +++ b/client/src/client.rs @@ -612,6 +612,23 @@ impl Iterator for EventIterator { } } +impl Drop for EventIterator { + fn drop(&mut self) { + let mut close = || -> eyre::Result<()> { + self.stream.close(None)?; + let mes = self.stream.read_message()?; + if !mes.is_close() { + return Err(eyre!( + "Server hasn't sent `Close` message for websocket handshake" + )); + } + Ok(()) + }; + + let _ = close().map_err(|e| warn!(%e)); + } +} + impl Debug for Client { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("Client") diff --git a/core/src/event.rs b/core/src/event.rs index 8a1350c6949..1719e562c5f 100644 --- a/core/src/event.rs +++ b/core/src/event.rs @@ -111,4 +111,12 @@ impl Consumer { } Err(Error::CantReceiveMessage) } + + /// Close stream. See [`WebSocket::close()`] + /// + /// # Errors + /// Throws up [`WebSocket::close()`] errors + pub async fn close_stream(self) -> Result<()> { + self.stream.close().await.map_err(Into::into) + } }