Skip to content

Commit

Permalink
time: lazily init timers on first poll (#6512)
Browse files Browse the repository at this point in the history
  • Loading branch information
wathenjiang authored May 3, 2024
1 parent b7d4fba commit f6eb1ee
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 19 deletions.
5 changes: 5 additions & 0 deletions benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,8 @@ harness = false
name = "time_now"
path = "time_now.rs"
harness = false

[[bench]]
name = "time_timeout"
path = "time_timeout.rs"
harness = false
109 changes: 109 additions & 0 deletions benches/time_timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
use std::time::{Duration, Instant};

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tokio::{
runtime::Runtime,
time::{sleep, timeout},
};

// a very quick async task, but might timeout
async fn quick_job() -> usize {
1
}

fn build_run_time(workers: usize) -> Runtime {
if workers == 1 {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
} else {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(workers)
.build()
.unwrap()
}
}

fn single_thread_scheduler_timeout(c: &mut Criterion) {
do_timeout_test(c, 1, "single_thread_timeout");
}

fn multi_thread_scheduler_timeout(c: &mut Criterion) {
do_timeout_test(c, 8, "multi_thread_timeout-8");
}

fn do_timeout_test(c: &mut Criterion, workers: usize, name: &str) {
let runtime = build_run_time(workers);
c.bench_function(name, |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(spawn_timeout_job(iters as usize, workers).await);
});
start.elapsed()
})
});
}

async fn spawn_timeout_job(iters: usize, procs: usize) {
let mut handles = Vec::with_capacity(procs);
for _ in 0..procs {
handles.push(tokio::spawn(async move {
for _ in 0..iters / procs {
let h = timeout(Duration::from_secs(1), quick_job());
assert_eq!(black_box(h.await.unwrap()), 1);
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}

fn single_thread_scheduler_sleep(c: &mut Criterion) {
do_sleep_test(c, 1, "single_thread_sleep");
}

fn multi_thread_scheduler_sleep(c: &mut Criterion) {
do_sleep_test(c, 8, "multi_thread_sleep-8");
}

fn do_sleep_test(c: &mut Criterion, workers: usize, name: &str) {
let runtime = build_run_time(workers);

c.bench_function(name, |b| {
b.iter_custom(|iters| {
let start = Instant::now();
runtime.block_on(async {
black_box(spawn_sleep_job(iters as usize, workers).await);
});
start.elapsed()
})
});
}

async fn spawn_sleep_job(iters: usize, procs: usize) {
let mut handles = Vec::with_capacity(procs);
for _ in 0..procs {
handles.push(tokio::spawn(async move {
for _ in 0..iters / procs {
let _h = black_box(sleep(Duration::from_secs(1)));
}
}));
}
for handle in handles {
handle.await.unwrap();
}
}

criterion_group!(
timeout_benchmark,
single_thread_scheduler_timeout,
multi_thread_scheduler_timeout,
single_thread_scheduler_sleep,
multi_thread_scheduler_sleep
);

criterion_main!(timeout_benchmark);
34 changes: 24 additions & 10 deletions tokio/src/runtime/time/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ pub(crate) struct TimerEntry {
///
/// This is manipulated only under the inner mutex. TODO: Can we use loom
/// cells for this?
inner: StdUnsafeCell<TimerShared>,
inner: StdUnsafeCell<Option<TimerShared>>,
/// Deadline for the timer. This is used to register on the first
/// poll, as we can't register prior to being pinned.
deadline: Instant,
Expand Down Expand Up @@ -469,35 +469,48 @@ unsafe impl linked_list::Link for TimerShared {

impl TimerEntry {
#[track_caller]
pub(crate) fn new(handle: &scheduler::Handle, deadline: Instant) -> Self {
pub(crate) fn new(handle: scheduler::Handle, deadline: Instant) -> Self {
// Panic if the time driver is not enabled
let _ = handle.driver().time();

let driver = handle.clone();

Self {
driver,
inner: StdUnsafeCell::new(TimerShared::new()),
driver: handle,
inner: StdUnsafeCell::new(None),
deadline,
registered: false,
_m: std::marker::PhantomPinned,
}
}

fn is_inner_init(&self) -> bool {
unsafe { &*self.inner.get() }.is_some()
}

// This lazy initialization is for performance purposes.
fn inner(&self) -> &TimerShared {
unsafe { &*self.inner.get() }
let inner = unsafe { &*self.inner.get() };
if inner.is_none() {
unsafe {
*self.inner.get() = Some(TimerShared::new());
}
}
return inner.as_ref().unwrap();
}

pub(crate) fn deadline(&self) -> Instant {
self.deadline
}

pub(crate) fn is_elapsed(&self) -> bool {
!self.inner().state.might_be_registered() && self.registered
self.is_inner_init() && !self.inner().state.might_be_registered() && self.registered
}

/// Cancels and deregisters the timer. This operation is irreversible.
pub(crate) fn cancel(self: Pin<&mut Self>) {
// Avoid calling the `clear_entry` method, because it has not been initialized yet.
if !self.is_inner_init() {
return;
}
// We need to perform an acq/rel fence with the driver thread, and the
// simplest way to do so is to grab the driver lock.
//
Expand All @@ -524,8 +537,9 @@ impl TimerEntry {
}

pub(crate) fn reset(mut self: Pin<&mut Self>, new_time: Instant, reregister: bool) {
unsafe { self.as_mut().get_unchecked_mut() }.deadline = new_time;
unsafe { self.as_mut().get_unchecked_mut() }.registered = reregister;
let this = unsafe { self.as_mut().get_unchecked_mut() };
this.deadline = new_time;
this.registered = reregister;

let tick = self.driver().time_source().deadline_to_tick(new_time);

Expand Down
12 changes: 6 additions & 6 deletions tokio/src/runtime/time/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn single_timer() {
let handle_ = handle.clone();
let jh = thread::spawn(move || {
let entry = TimerEntry::new(
&handle_.inner,
handle_.inner.clone(),
handle_.inner.driver().clock().now() + Duration::from_secs(1),
);
pin!(entry);
Expand Down Expand Up @@ -83,7 +83,7 @@ fn drop_timer() {
let handle_ = handle.clone();
let jh = thread::spawn(move || {
let entry = TimerEntry::new(
&handle_.inner,
handle_.inner.clone(),
handle_.inner.driver().clock().now() + Duration::from_secs(1),
);
pin!(entry);
Expand Down Expand Up @@ -117,7 +117,7 @@ fn change_waker() {
let handle_ = handle.clone();
let jh = thread::spawn(move || {
let entry = TimerEntry::new(
&handle_.inner,
handle_.inner.clone(),
handle_.inner.driver().clock().now() + Duration::from_secs(1),
);
pin!(entry);
Expand Down Expand Up @@ -157,7 +157,7 @@ fn reset_future() {
let start = handle.inner.driver().clock().now();

let jh = thread::spawn(move || {
let entry = TimerEntry::new(&handle_.inner, start + Duration::from_secs(1));
let entry = TimerEntry::new(handle_.inner.clone(), start + Duration::from_secs(1));
pin!(entry);

let _ = entry
Expand Down Expand Up @@ -219,7 +219,7 @@ fn poll_process_levels() {

for i in 0..normal_or_miri(1024, 64) {
let mut entry = Box::pin(TimerEntry::new(
&handle.inner,
handle.inner.clone(),
handle.inner.driver().clock().now() + Duration::from_millis(i),
));

Expand Down Expand Up @@ -253,7 +253,7 @@ fn poll_process_levels_targeted() {
let handle = rt.handle();

let e1 = TimerEntry::new(
&handle.inner,
handle.inner.clone(),
handle.inner.driver().clock().now() + Duration::from_millis(193),
);
pin!(e1);
Expand Down
5 changes: 2 additions & 3 deletions tokio/src/time/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,11 @@ impl Sleep {
location: Option<&'static Location<'static>>,
) -> Sleep {
use crate::runtime::scheduler;

let handle = scheduler::Handle::current();
let entry = TimerEntry::new(&handle, deadline);

let entry = TimerEntry::new(handle, deadline);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let inner = {
let handle = scheduler::Handle::current();
let clock = handle.driver().clock();
let handle = &handle.driver().time();
let time_source = handle.time_source();
Expand Down

0 comments on commit f6eb1ee

Please sign in to comment.