diff --git a/crates/shared/src/utils.rs b/crates/shared/src/utils.rs index 349bbc9e..e08e1625 100644 --- a/crates/shared/src/utils.rs +++ b/crates/shared/src/utils.rs @@ -161,21 +161,32 @@ impl EventServiceStream match connection.next().await { - Some(Ok(event)) => { - return Some((event, this)); + Left(connection) => { + match tokio::time::timeout(Self::RETRY_PERIOD, connection.next()).await { + Ok(Some(Ok(event))) => { + return Some((event, this)); + } + Ok(Some(Err(err))) => { + warn!(?err, "Error in event stream"); + continue; + } + Ok(None) => { + warn!("Event stream ended, attempting reconnection"); + let fut = Self::connect_inner(this.api_url.clone()); + let _ = + std::mem::replace(&mut this.connection, Right(Box::pin(fut))); + continue; + } + Err(_) => { + // Timeout occurred, reconnect + warn!("Timeout waiting for next event; reconnecting"); + let fut = Self::connect_inner(this.api_url.clone()); + let _ = + std::mem::replace(&mut this.connection, Right(Box::pin(fut))); + continue; + } } - Some(Err(err)) => { - warn!(?err, "Error in event stream"); - continue; - } - None => { - warn!("Event stream ended, attempting reconnection"); - let fut = Self::connect_inner(this.api_url.clone()); - let _ = std::mem::replace(&mut this.connection, Right(Box::pin(fut))); - continue; - } - }, + } Right(reconnection) => match reconnection.await { Ok(connection) => { let _ = std::mem::replace(&mut this.connection, Left(connection)); @@ -218,6 +229,7 @@ mod tests { use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime}; use tide_disco::{method::ReadState, App}; use tokio::{spawn, task::JoinHandle, time::timeout}; + use tracing::debug; use url::Url; use vbs::version::StaticVersion; @@ -281,7 +293,7 @@ mod tests { } #[tokio::test(flavor = "multi_thread")] - async fn event_stream_wrapper() { + async fn test_event_stream_wrapper() { const TIMEOUT: Duration = Duration::from_secs(3); let url: Url = format!( @@ -323,6 +335,59 @@ mod tests { .await .expect_err("API is reachable, but is on wrong path"); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_event_stream_wrapper_with_idle_timeout() { + const TIMEOUT: Duration = Duration::from_secs(3); + + let url: Url = format!( + "http://localhost:{}", + portpicker::pick_unused_port().unwrap() + ) + .parse() + .unwrap(); + + let app_handle = run_app("hotshot-events", url.clone()); + + let mut stream = EventServiceStream::::connect(url.clone()) + .await + .unwrap(); + + // The stream should work when the server is running + timeout(TIMEOUT, stream.next()) + .await + .expect("When mock event server is spawned, stream should work") + .unwrap(); + + // Simulate idle timeout by stopping the server and waiting + app_handle.abort(); + tokio::time::sleep( + EventServiceStream::::RETRY_PERIOD + Duration::from_millis(500), + ) + .await; // Wait longer than idle timeout + // Check whether stream returns Err(_) after idle timeout + match timeout( + EventServiceStream::::RETRY_PERIOD, + stream.next(), + ) + .await + { + Ok(Some(_)) => panic!("Expected error after idle timeout but got an event"), + Ok(None) => panic!("Expected error but got None"), + Err(err) => debug!("Stream returned an error after idle timeout: {:?}", err), + } + + // Stream should reconnect after idle timeout + let new_app_handle = run_app("hotshot-events", url.clone()); + + timeout(TIMEOUT, stream.next()) + .await + .expect("After idle timeout, stream should reconnect when the server restarts") + .unwrap(); + + // Cleanup + new_app_handle.abort(); + } } #[derive(Debug, Clone, Hash, PartialEq, Eq)]