Skip to content

Commit

Permalink
fix: reset IDLE timeout when keepalive is received
Browse files Browse the repository at this point in the history
This allows to use shorter timeouts
to detect lost connection earlier.
E.g. if timeout is set to 5 minutes
and the server sends keepalives every 2 minutes,
IDLE will never be interrupted.
If connection is lost, it will be noticed not later than 5 minutes later.

`wait` method which does not accept the timeout
now uses default timeout of 24 hours,
but should probably be deprecated.
  • Loading branch information
link2xt committed Dec 11, 2023
1 parent 79dfeb7 commit 73c3119
Showing 1 changed file with 26 additions and 28 deletions.
54 changes: 26 additions & 28 deletions src/extensions/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,44 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Handle<T> {
) -> (
impl Future<Output = Result<IdleResponse>> + '_,
stop_token::StopSource,
) {
self.wait_with_timeout(Duration::from_secs(24 * 60 * 60))
}

/// Start listening to the server side responses.
///
/// Stops after the passed in `timeout` without any response from the server.
/// Timeout is reset by any response, including `* OK Still here` keepalives.
///
/// Must be called after [Handle::init].
pub fn wait_with_timeout(
&mut self,
dur: Duration,
) -> (
impl Future<Output = Result<IdleResponse>> + '_,
stop_token::StopSource,
) {
assert!(
self.id.is_some(),
"Cannot listen to response without starting IDLE"
);

let sender = self.session.unsolicited_responses_tx.clone();

let interrupt = stop_token::StopSource::new();
let raw_stream = IdleStream::new(self);
let mut interruptible_stream = raw_stream.timeout_at(interrupt.token());

let fut = async move {
while let Some(Ok(resp)) = interruptible_stream.next().await {
loop {
let Ok(res) = timeout(dur, interruptible_stream.next()).await else {
return Ok(IdleResponse::Timeout);
};

let Some(Ok(resp)) = res else {
return Ok(IdleResponse::ManualInterrupt);
};

let resp = resp?;
match resp.parsed() {
Response::Data {
Expand All @@ -148,33 +173,6 @@ impl<T: Read + Write + Unpin + fmt::Debug + Send> Handle<T> {
_ => return Ok(IdleResponse::NewData(resp)),
}
}

Ok(IdleResponse::ManualInterrupt)
};

(fut, interrupt)
}

/// Start listening to the server side resonses, stops latest after the passed in `timeout`.
/// Must be called after [Handle::init].
pub fn wait_with_timeout(
&mut self,
dur: Duration,
) -> (
impl Future<Output = Result<IdleResponse>> + '_,
stop_token::StopSource,
) {
assert!(
self.id.is_some(),
"Cannot listen to response without starting IDLE"
);

let (waiter, interrupt) = self.wait();
let fut = async move {
match timeout(dur, waiter).await {
Ok(res) => res,
Err(_err) => Ok(IdleResponse::Timeout),
}
};

(fut, interrupt)
Expand Down

0 comments on commit 73c3119

Please sign in to comment.