Skip to content

Commit

Permalink
time: fix wake-up with interval on Ready (tokio-rs#5551)
Browse files Browse the repository at this point in the history
When `tokio::time::Interval::poll_tick()` returns `Poll::Pending`, it
schedules itself for being woken up again through the waker of the
passed context, which is correct behavior.

However when `Poll::Ready(_)` is returned, the interval timer should be
reset but not scheduled to be woken up again as this is up to the
caller.

This commit fixes the bug by introducing a `reset_without_reregister`
method on `TimerEntry` which is called by `Intervall::poll_tick(cx)` in
case the delay poll returns `Poll::Ready(_)`.
  • Loading branch information
sgasse committed Mar 17, 2023
1 parent f177aad commit b2d67ef
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 1 deletion.
11 changes: 11 additions & 0 deletions tokio/src/runtime/time/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,17 @@ impl TimerEntry {
}
}

pub(crate) fn reset_without_reregister(mut self: Pin<&mut Self>, new_time: Instant) {
unsafe { self.as_mut().get_unchecked_mut() }.deadline = new_time;
unsafe { self.as_mut().get_unchecked_mut() }.registered = false;

let tick = self.driver().time_source().deadline_to_tick(new_time);

if self.inner().extend_expiration(tick).is_ok() {
return;
}
}

pub(crate) fn poll_elapsed(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand Down
5 changes: 4 additions & 1 deletion tokio/src/time/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,10 @@ impl Interval {
timeout + self.period
};

self.delay.as_mut().reset(next);
// When we arrive here, the internal delay returned `Poll::Ready`.
// Reset the delay but do not register it. It should be registered with
// the next call to [`poll_tick`].
self.delay.as_mut().reset_without_reregister(next);

// Return the time when we were scheduled to tick
Poll::Ready(timeout)
Expand Down
20 changes: 20 additions & 0 deletions tokio/src/time/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,26 @@ impl Sleep {
self.reset_inner(deadline)
}

/// Resets the `Sleep` instance to a new deadline without reregistering it
/// to be woken up.
///
/// Calling this function allows changing the instant at which the `Sleep`
/// future completes without having to create new associated state and
/// without having it registered. This is required in e.g. the
/// [crate::time::Interval] where we want to reset the internal [Sleep]
/// without having it wake up the last task that polled it.
///
/// This function can be called both before and after the future has
/// completed.
///
/// To call this method, you will usually combine the call with
/// [`Pin::as_mut`], which lets you call the method without consuming the
/// `Sleep` itself.
pub fn reset_without_reregister(self: Pin<&mut Self>, deadline: Instant) {
let mut me = self.project();
me.entry.as_mut().reset_without_reregister(deadline);
}

fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
let mut me = self.project();
me.entry.as_mut().reset(deadline);
Expand Down
73 changes: 73 additions & 0 deletions tokio/tests/time_interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,76 @@ fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}

mod tmp_tests {
use std::{
pin::Pin,
task::{Context, Poll},
time::Instant,
};

use crate::time::Interval;
use futures::{pin_mut, Stream, StreamExt};

struct IntervalStreamer {
start: Instant,
counter: u32,
timer: Interval,
}

impl Stream for IntervalStreamer {
type Item = u32;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = Pin::into_inner(self);

if this.counter > 12 {
return Poll::Ready(None);
}

match this.timer.poll_tick(cx) {
Poll::Pending => {
println!(
"Timer returned Poll::Pending after {:?}",
this.start.elapsed()
);
Poll::Pending
}
Poll::Ready(_) => {
println!(
"Timer returned Poll::Ready after {:?}",
this.start.elapsed()
);
this.counter += 1;
if this.counter % 4 == 0 {
Poll::Ready(Some(this.counter))
} else {
// Schedule this task for wake-up
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
}
}

#[tokio::test]
async fn reset_without_reregister() {
let stream = IntervalStreamer {
start: Instant::now(),
counter: 0,
timer: crate::time::interval(std::time::Duration::from_millis(10)),
};

pin_mut!(stream);

let mut results = Vec::with_capacity(4);
while let Some(item) = stream.next().await {
println!("Stream yielded an item: {}", item);
results.push(item);
}

dbg!(&results);
assert_eq!(results, vec![4, 8, 12]);
}
}

0 comments on commit b2d67ef

Please sign in to comment.