Skip to content

Commit

Permalink
[fix] hyperledger-iroha#2005: Fix Client::listen_for_events() not c…
Browse files Browse the repository at this point in the history
…losing WebSocket stream (hyperledger-iroha#2095)
  • Loading branch information
Arjentix authored and mversic committed May 2, 2022
1 parent 4f0620d commit afa5ffa
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
11 changes: 6 additions & 5 deletions cli/src/torii/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,18 +233,19 @@ mod subscription {
/// There should be a [`warp::filters::ws::Message::close()`] message to end subscription
#[iroha_futures::telemetry_future]
pub async fn handle_subscription(events: EventsSender, stream: WebSocket) -> eyre::Result<()> {
match subscribe_forever(events, stream).await {
Ok(()) | Err(Error::CloseMessage) => Ok(()),
let mut consumer = Consumer::new(stream).await?;

match subscribe_forever(events, &mut consumer).await {
Ok(()) | Err(Error::CloseMessage) => consumer.close_stream().await.map_err(Into::into),
Err(err) => Err(err.into()),
}
}

/// Make endless `stream` subscription for `events`
/// Make endless `consumer` subscription for `events`
///
/// Ideally should return `Result<!>` cause it either runs forever either returns `Err` variant
async fn subscribe_forever(events: EventsSender, stream: WebSocket) -> Result<()> {
async fn subscribe_forever(events: EventsSender, consumer: &mut Consumer) -> Result<()> {
let mut events = events.subscribe();
let mut consumer = Consumer::new(stream).await?;

loop {
tokio::select! {
Expand Down
17 changes: 17 additions & 0 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,23 @@ impl Iterator for EventIterator {
}
}

impl Drop for EventIterator {
fn drop(&mut self) {
let mut close = || -> eyre::Result<()> {
self.stream.close(None)?;
let mes = self.stream.read_message()?;
if !mes.is_close() {
return Err(eyre!(
"Server hasn't sent `Close` message for websocket handshake"
));
}
Ok(())
};

let _ = close().map_err(|e| warn!(%e));
}
}

impl Debug for Client {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
Expand Down
8 changes: 8 additions & 0 deletions core/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,12 @@ impl Consumer {
}
Err(Error::CantReceiveMessage)
}

/// Close stream. See [`WebSocket::close()`]
///
/// # Errors
/// Throws up [`WebSocket::close()`] errors
pub async fn close_stream(self) -> Result<()> {
self.stream.close().await.map_err(Into::into)
}
}

0 comments on commit afa5ffa

Please sign in to comment.