diff --git a/swap/src/kraken.rs b/swap/src/kraken.rs index 69dc4bb56..ca2dd9e92 100644 --- a/swap/src/kraken.rs +++ b/swap/src/kraken.rs @@ -22,17 +22,44 @@ pub fn connect() -> Result { async move { let mut stream = connection::new().await?; - while let Some(update) = stream.try_next().await.map_err(to_backoff)? { - let send_result = rate_update.send(Ok(update)); - - if send_result.is_err() { - return Err(backoff::Error::Permanent(anyhow!( - "receiver disconnected" - ))); + loop { + let update = stream.try_next().await; + + match update { + Ok(rate) => { + if let Some(rate) = rate { + let send_result = rate_update.send(Ok(rate)); + + if send_result.is_err() { + return Err(backoff::Error::Permanent(anyhow!( + "receiver disconnected" + ))); + } + } + } + Err(e) => { + match e { + // Connection closures and websocket errors will be retried + connection::Error::ConnectionClosed => { + // Try to renew the connection in case of websocket failure + // This failure can be caused by Cloudflare closing he connection for specific IPs + stream = connection::new().await?; + return Err(backoff::Error::Transient(anyhow::Error::from(e))); + }, + connection::Error::WebSocket(_) => { + // Try to renew the connection in case of websocket failure + // This failure can be caused by Cloudflare closing he connection for specific IPs + stream = connection::new().await?; + return Err(backoff::Error::Transient(anyhow::Error::from(e))); + }, + + // Failures while parsing a message are permanent because they most likely present a + // programmer error + connection::Error::Parse(_) => { return Err(backoff::Error::Permanent(anyhow::Error::from(e))) }, + } + } } } - - Err(backoff::Error::Transient(anyhow!("stream ended"))) } }, |error, next: Duration| { @@ -84,22 +111,6 @@ pub enum Error { type RateUpdate = Result; -/// Maps a [`connection::Error`] to a backoff error, effectively defining our -/// retry strategy. -fn to_backoff(e: connection::Error) -> backoff::Error { - use backoff::Error::*; - - match e { - // Connection closures and websocket errors will be retried - connection::Error::ConnectionClosed => Transient(anyhow::Error::from(e)), - connection::Error::WebSocket(_) => Transient(anyhow::Error::from(e)), - - // Failures while parsing a message are permanent because they most likely present a - // programmer error - connection::Error::Parse(_) => Permanent(anyhow::Error::from(e)), - } -} - /// Kraken websocket connection module. /// /// Responsible for establishing a connection to the Kraken websocket API and