Skip to content

Commit

Permalink
io: speed-up waking by using uninitialized array (#4055)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gleb Pomykalov authored Aug 25, 2021
1 parent 897fed1 commit 51f4f05
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 19 deletions.
27 changes: 8 additions & 19 deletions tokio/src/io/driver/scheduled_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
use crate::util::slab::Entry;
use crate::util::WakeList;

use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::{Context, Poll, Waker};
Expand Down Expand Up @@ -212,10 +213,7 @@ impl ScheduledIo {
}

fn wake0(&self, ready: Ready, shutdown: bool) {
const NUM_WAKERS: usize = 32;

let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
let mut curr = 0;
let mut wakers = WakeList::new();

let mut waiters = self.waiters.lock();

Expand All @@ -224,32 +222,29 @@ impl ScheduledIo {
// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
wakers[curr] = Some(waker);
curr += 1;
wakers.push(waker);
}
}

// check for AsyncWrite slot
if ready.is_writable() {
if let Some(waker) = waiters.writer.take() {
wakers[curr] = Some(waker);
curr += 1;
wakers.push(waker);
}
}

#[cfg(feature = "net")]
'outer: loop {
let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));

while curr < NUM_WAKERS {
while wakers.can_push() {
match iter.next() {
Some(waiter) => {
let waiter = unsafe { &mut *waiter.as_ptr() };

if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
wakers[curr] = Some(waker);
curr += 1;
wakers.push(waker);
}
}
None => {
Expand All @@ -260,11 +255,7 @@ impl ScheduledIo {

drop(waiters);

for waker in wakers.iter_mut().take(curr) {
waker.take().unwrap().wake();
}

curr = 0;
wakers.wake_all();

// Acquire the lock again.
waiters = self.waiters.lock();
Expand All @@ -273,9 +264,7 @@ impl ScheduledIo {
// Release the lock before notifying
drop(waiters);

for waker in wakers.iter_mut().take(curr) {
waker.take().unwrap().wake();
}
wakers.wake_all();
}

pub(super) fn ready_event(&self, interest: Interest) -> ReadyEvent {
Expand Down
3 changes: 3 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
cfg_io_driver! {
pub(crate) mod bit;
pub(crate) mod slab;

mod wake_list;
pub(crate) use wake_list::WakeList;
}

#[cfg(any(
Expand Down
47 changes: 47 additions & 0 deletions tokio/src/util/wake_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use core::mem::MaybeUninit;
use core::ptr;
use std::task::Waker;

const NUM_WAKERS: usize = 32;

pub(crate) struct WakeList {
inner: [MaybeUninit<Waker>; NUM_WAKERS],
curr: usize,
}

impl WakeList {
pub(crate) fn new() -> Self {
Self {
inner: unsafe { MaybeUninit::uninit().assume_init() },
curr: 0,
}
}

#[inline]
pub(crate) fn can_push(&self) -> bool {
self.curr < NUM_WAKERS
}

pub(crate) fn push(&mut self, val: Waker) {
debug_assert!(self.can_push());

self.inner[self.curr] = MaybeUninit::new(val);
self.curr += 1;
}

pub(crate) fn wake_all(&mut self) {
assert!(self.curr <= NUM_WAKERS);
while self.curr > 0 {
self.curr -= 1;
let waker = unsafe { ptr::read(self.inner[self.curr].as_mut_ptr()) };
waker.wake();
}
}
}

impl Drop for WakeList {
fn drop(&mut self) {
let slice = ptr::slice_from_raw_parts_mut(self.inner.as_mut_ptr() as *mut Waker, self.curr);
unsafe { ptr::drop_in_place(slice) };
}
}

3 comments on commit 51f4f05

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'sync_mpsc'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 51f4f05 Previous: d0305d5 Ratio
send_large 62527 ns/iter (± 28976) 26175 ns/iter (± 5653) 2.39

This comment was automatically generated by workflow using github-action-benchmark.

CC: @tokio-rs/maintainers

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'sync_mpsc'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 51f4f05 Previous: d0305d5 Ratio
send_large 58252 ns/iter (± 9867) 26175 ns/iter (± 5653) 2.23

This comment was automatically generated by workflow using github-action-benchmark.

CC: @tokio-rs/maintainers

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'sync_mpsc'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 51f4f05 Previous: d0305d5 Ratio
send_large 58576 ns/iter (± 4887) 26175 ns/iter (± 5653) 2.24

This comment was automatically generated by workflow using github-action-benchmark.

CC: @tokio-rs/maintainers

Please sign in to comment.