Skip to content

Commit

Permalink
Merge pull request #257 from EspressoSystems/sishan/events_service_co…
Browse files Browse the repository at this point in the history
…nnection

Fix the events service connection
  • Loading branch information
dailinsubjam authored Dec 3, 2024
2 parents da10d18 + 79b8202 commit db39cfe
Showing 1 changed file with 80 additions and 15 deletions.
95 changes: 80 additions & 15 deletions crates/shared/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,21 +161,32 @@ impl<Types: NodeType, ApiVer: StaticVersionType + 'static> EventServiceStream<Ty
let stream = unfold(this, |mut this| async move {
loop {
match &mut this.connection {
Left(connection) => match connection.next().await {
Some(Ok(event)) => {
return Some((event, this));
Left(connection) => {
match tokio::time::timeout(Self::RETRY_PERIOD, connection.next()).await {
Ok(Some(Ok(event))) => {
return Some((event, this));
}
Ok(Some(Err(err))) => {
warn!(?err, "Error in event stream");
continue;
}
Ok(None) => {
warn!("Event stream ended, attempting reconnection");
let fut = Self::connect_inner(this.api_url.clone());
let _ =
std::mem::replace(&mut this.connection, Right(Box::pin(fut)));
continue;
}
Err(_) => {
// Timeout occurred, reconnect
warn!("Timeout waiting for next event; reconnecting");
let fut = Self::connect_inner(this.api_url.clone());
let _ =
std::mem::replace(&mut this.connection, Right(Box::pin(fut)));
continue;
}
}
Some(Err(err)) => {
warn!(?err, "Error in event stream");
continue;
}
None => {
warn!("Event stream ended, attempting reconnection");
let fut = Self::connect_inner(this.api_url.clone());
let _ = std::mem::replace(&mut this.connection, Right(Box::pin(fut)));
continue;
}
},
}
Right(reconnection) => match reconnection.await {
Ok(connection) => {
let _ = std::mem::replace(&mut this.connection, Left(connection));
Expand Down Expand Up @@ -218,6 +229,7 @@ mod tests {
use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime};
use tide_disco::{method::ReadState, App};
use tokio::{spawn, task::JoinHandle, time::timeout};
use tracing::debug;
use url::Url;
use vbs::version::StaticVersion;

Expand Down Expand Up @@ -281,7 +293,7 @@ mod tests {
}

#[tokio::test(flavor = "multi_thread")]
async fn event_stream_wrapper() {
async fn test_event_stream_wrapper() {
const TIMEOUT: Duration = Duration::from_secs(3);

let url: Url = format!(
Expand Down Expand Up @@ -323,6 +335,59 @@ mod tests {
.await
.expect_err("API is reachable, but is on wrong path");
}

#[tokio::test(flavor = "multi_thread")]
async fn test_event_stream_wrapper_with_idle_timeout() {
const TIMEOUT: Duration = Duration::from_secs(3);

let url: Url = format!(
"http://localhost:{}",
portpicker::pick_unused_port().unwrap()
)
.parse()
.unwrap();

let app_handle = run_app("hotshot-events", url.clone());

let mut stream = EventServiceStream::<TestTypes, MockVersion>::connect(url.clone())
.await
.unwrap();

// The stream should work when the server is running
timeout(TIMEOUT, stream.next())
.await
.expect("When mock event server is spawned, stream should work")
.unwrap();

// Simulate idle timeout by stopping the server and waiting
app_handle.abort();
tokio::time::sleep(
EventServiceStream::<TestTypes, MockVersion>::RETRY_PERIOD + Duration::from_millis(500),
)
.await; // Wait longer than idle timeout
// Check whether stream returns Err(_) after idle timeout
match timeout(
EventServiceStream::<TestTypes, MockVersion>::RETRY_PERIOD,
stream.next(),
)
.await
{
Ok(Some(_)) => panic!("Expected error after idle timeout but got an event"),
Ok(None) => panic!("Expected error but got None"),
Err(err) => debug!("Stream returned an error after idle timeout: {:?}", err),
}

// Stream should reconnect after idle timeout
let new_app_handle = run_app("hotshot-events", url.clone());

timeout(TIMEOUT, stream.next())
.await
.expect("After idle timeout, stream should reconnect when the server restarts")
.unwrap();

// Cleanup
new_app_handle.abort();
}
}

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
Expand Down

0 comments on commit db39cfe

Please sign in to comment.