diff --git a/src/extensions/idle.rs b/src/extensions/idle.rs index f54b58f..2a981ed 100644 --- a/src/extensions/idle.rs +++ b/src/extensions/idle.rs @@ -119,11 +119,28 @@ impl Handle { ) -> ( impl Future> + '_, stop_token::StopSource, + ) { + self.wait_with_timeout(Duration::from_secs(24 * 60 * 60)) + } + + /// Start listening to the server side responses. + /// + /// Stops after the passed in `timeout` without any response from the server. + /// Timeout is reset by any response, including `* OK Still here` keepalives. + /// + /// Must be called after [Handle::init]. + pub fn wait_with_timeout( + &mut self, + dur: Duration, + ) -> ( + impl Future> + '_, + stop_token::StopSource, ) { assert!( self.id.is_some(), "Cannot listen to response without starting IDLE" ); + let sender = self.session.unsolicited_responses_tx.clone(); let interrupt = stop_token::StopSource::new(); @@ -131,7 +148,15 @@ impl Handle { let mut interruptible_stream = raw_stream.timeout_at(interrupt.token()); let fut = async move { - while let Some(Ok(resp)) = interruptible_stream.next().await { + loop { + let Ok(res) = timeout(dur, interruptible_stream.next()).await else { + return Ok(IdleResponse::Timeout); + }; + + let Some(Ok(resp)) = res else { + return Ok(IdleResponse::ManualInterrupt); + }; + let resp = resp?; match resp.parsed() { Response::Data { @@ -148,33 +173,6 @@ impl Handle { _ => return Ok(IdleResponse::NewData(resp)), } } - - Ok(IdleResponse::ManualInterrupt) - }; - - (fut, interrupt) - } - - /// Start listening to the server side resonses, stops latest after the passed in `timeout`. - /// Must be called after [Handle::init]. - pub fn wait_with_timeout( - &mut self, - dur: Duration, - ) -> ( - impl Future> + '_, - stop_token::StopSource, - ) { - assert!( - self.id.is_some(), - "Cannot listen to response without starting IDLE" - ); - - let (waiter, interrupt) = self.wait(); - let fut = async move { - match timeout(dur, waiter).await { - Ok(res) => res, - Err(_err) => Ok(IdleResponse::Timeout), - } }; (fut, interrupt)