From e6a171035bef32eb12b7725e29eeb824dc3a1dc1 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Tue, 4 Jan 2022 19:10:37 +0000 Subject: [PATCH 1/9] io: make `EPOLLERR` awake `AsyncFd::readable` Currently, if a user is awaiting on a `AsyncFd` becoming readable and we get a polling error (which may happen in some cases, see the referenced issue), the awaiter will never be woken. Ideally, they would be notified of this error in some way. The solution here involves communicating to the `Readiness` object that there was an error, and when that happens, the `Readiness` future will return a synthetic error that boils down to "there was a polling error". If the user wants to find more information about the underlying error, they would have to find some other way (not clear how yet? as pointed out in the issue, for sockets one could use `SO_ERROR`). --- tokio/src/io/driver/ready.rs | 24 ++++++++ tokio/src/io/driver/registration.rs | 2 +- tokio/src/io/driver/scheduled_io.rs | 88 +++++++++++++++++++++-------- 3 files changed, 89 insertions(+), 25 deletions(-) diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs index 2430d3022f1..aa236aadcd1 100644 --- a/tokio/src/io/driver/ready.rs +++ b/tokio/src/io/driver/ready.rs @@ -7,6 +7,7 @@ const READABLE: usize = 0b0_01; const WRITABLE: usize = 0b0_10; const READ_CLOSED: usize = 0b0_0100; const WRITE_CLOSED: usize = 0b0_1000; +const ERROR: usize = 0b1_0000; /// Describes the readiness state of an I/O resources. /// @@ -31,6 +32,9 @@ impl Ready { /// Returns a `Ready` representing write closed readiness. pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED); + /// Returns a `Ready` representing error readiness + pub const ERROR: Ready = Ready(ERROR); + /// Returns a `Ready` representing readiness for all operations. pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); @@ -65,6 +69,10 @@ impl Ready { ready |= Ready::WRITE_CLOSED; } + if event.is_error() { + ready |= Ready::ERROR; + } + ready } @@ -144,6 +152,21 @@ impl Ready { self.contains(Ready::WRITE_CLOSED) } + /// Returns `true` if the value includes write-closed `readiness`. + /// + /// # Examples + /// + /// ``` + /// use tokio::io::Ready; + /// + /// assert!(!Ready::EMPTY.is_write_closed()); + /// assert!(!Ready::WRITABLE.is_write_closed()); + /// assert!(Ready::WRITE_CLOSED.is_write_closed()); + /// ``` + pub fn is_error(self) -> bool { + self.contains(Ready::ERROR) + } + /// Returns true if `self` is a superset of `other`. /// /// `other` may represent more than one readiness operations, in which case @@ -245,6 +268,7 @@ impl fmt::Debug for Ready { .field("is_writable", &self.is_writable()) .field("is_read_closed", &self.is_read_closed()) .field("is_write_closed", &self.is_write_closed()) + .field("is_error", &self.is_error()) .finish() } } diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index 7350be6345d..e72eb0e4693 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -242,7 +242,7 @@ cfg_io_readiness! { ))); } - Pin::new(&mut fut).poll(cx).map(Ok) + Pin::new(&mut fut).poll(cx) }).await } diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 76f93431ba2..248ac5973dd 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -60,7 +60,13 @@ cfg_io_readiness! { /// The interest this waiter is waiting on. interest: Interest, - is_ready: bool, + // TODO: the Readiness object can figure out this information by reading + // the ScheduledIO `readiness` value (and in fact it does in its future + // impl, before putting the waiter in the waitlist.). Why do we need + // this extra field here? Is it because once we have locked the waitlist + // (which we have to do anyway) it's cheaper to read this value than to + // atomically read the `readiness` of the ScheduledIO? + wakeup_type: Option, /// Should never be `!Unpin`. _p: PhantomPinned, @@ -81,6 +87,12 @@ cfg_io_readiness! { Waiting, Done, } + + #[derive(Debug)] + enum WakeupType { + Ready, + PollError + } } // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. @@ -220,7 +232,8 @@ impl ScheduledIo { waiters.is_shutdown |= shutdown; // check for AsyncRead slot - if ready.is_readable() { + if ready.is_readable() || ready.is_error() { + // TODO: figure out what to do for the AsyncRead and AsyncWrite slots if let Some(waker) = waiters.reader.take() { wakers.push(waker); } @@ -235,15 +248,23 @@ impl ScheduledIo { #[cfg(feature = "net")] 'outer: loop { - let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest)); - + let is_error = ready.is_error(); + // should we wakeup all waiters in case of error? I guess that + // depends on the semantics of EPOLLERR but I don't know yet + let mut iter = waiters + .list + .drain_filter(|w| ready.satisfies(w.interest) || is_error); while wakers.can_push() { match iter.next() { Some(waiter) => { let waiter = unsafe { &mut *waiter.as_ptr() }; if let Some(waker) = waiter.waker.take() { - waiter.is_ready = true; + waiter.wakeup_type = if is_error { + Some(WakeupType::PollError) + } else { + Some(WakeupType::Ready) + }; wakers.push(waker); } } @@ -263,7 +284,6 @@ impl ScheduledIo { // Release the lock before notifying drop(waiters); - wakers.wake_all(); } @@ -364,7 +384,7 @@ unsafe impl Sync for ScheduledIo {} cfg_io_readiness! { impl ScheduledIo { /// An async version of `poll_readiness` which uses a linked list of wakers. - pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent { + pub(crate) async fn readiness(&self, interest: Interest) -> std::io::Result { self.readiness_fut(interest).await } @@ -379,7 +399,7 @@ cfg_io_readiness! { waiter: UnsafeCell::new(Waiter { pointers: linked_list::Pointers::new(), waker: None, - is_ready: false, + wakeup_type: None, interest, _p: PhantomPinned, }), @@ -407,7 +427,7 @@ cfg_io_readiness! { // ===== impl Readiness ===== impl Future for Readiness<'_> { - type Output = ReadyEvent; + type Output = std::io::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { use std::sync::atomic::Ordering::SeqCst; @@ -416,7 +436,6 @@ cfg_io_readiness! { let me = self.get_unchecked_mut(); (&me.scheduled_io, &mut me.state, &me.waiter) }; - loop { match *state { State::Init => { @@ -424,6 +443,13 @@ cfg_io_readiness! { let curr = scheduled_io.readiness.load(SeqCst); let ready = Ready::from_usize(READINESS.unpack(curr)); + // the information is read via the `curr` in the scheduled io + if ready.is_error() { + return Poll::Ready(Err( + std::io::Error::new(std::io::ErrorKind::Other, "Polling error") + )); + } + // Safety: `waiter.interest` never changes let interest = unsafe { (*waiter.get()).interest }; let ready = ready.intersection(interest); @@ -432,7 +458,7 @@ cfg_io_readiness! { // Currently ready! let tick = TICK.unpack(curr) as u8; *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready }); + return Poll::Ready(Ok(ReadyEvent { tick, ready })); } // Wasn't ready, take the lock (and check again while locked). @@ -451,7 +477,7 @@ cfg_io_readiness! { // Currently ready! let tick = TICK.unpack(curr) as u8; *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready }); + return Poll::Ready(Ok(ReadyEvent { tick, ready })); } // Not ready even after locked, insert into list... @@ -480,18 +506,28 @@ cfg_io_readiness! { // Safety: called while locked let w = unsafe { &mut *waiter.get() }; - if w.is_ready { - // Our waker has been notified. - *state = State::Done; - } else { - // Update the waker, if necessary. - if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { - w.waker = Some(cx.waker().clone()); + match w.wakeup_type { + Some(WakeupType::Ready) => { + // Our waker has been notified. + *state = State::Done; + }, + Some(WakeupType::PollError) => { + return Poll::Ready(Err( + std::io::Error::new( + std::io::ErrorKind::Other, + "Polling error" + ) + )); + }, + None => { + // Update the waker, if necessary. + if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { + w.waker = Some(cx.waker().clone()); + } + + return Poll::Pending; } - - return Poll::Pending; } - // Explicit drop of the lock to indicate the scope that the // lock is held. Because holding the lock is required to // ensure safe access to fields not held within the lock, it @@ -503,12 +539,16 @@ cfg_io_readiness! { let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; // Safety: State::Done means it is no longer shared + // TODO: I'm not sure why this safety claim is true. What about + // it being `Done` prevents it from being shared? As far as I can tell + // the `wake0` method in the scheduledio doesn't care about this being + // Done? let w = unsafe { &mut *waiter.get() }; - return Poll::Ready(ReadyEvent { + return Poll::Ready(Ok(ReadyEvent { tick, ready: Ready::from_interest(w.interest), - }); + })); } } } From 15306044f07c326759f16e381d0c0b927e6b6c66 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Mon, 31 Jan 2022 23:51:35 +0000 Subject: [PATCH 2/9] communicate polling errors to AsyncRead users I've added some throwaway code to implement this functionality, which I already implemented for `readable()`. I haven't been able to test it yet, but that's the next thing I plan to do. After coming up with a satisfactory testing story, I will think through each of the TODOs one by one --- tokio/src/io/driver/registration.rs | 3 +- tokio/src/io/driver/scheduled_io.rs | 44 +++++++++++++++++++++-------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index e72eb0e4693..1e9d61c7d43 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -155,7 +155,8 @@ impl Registration { ) -> Poll> { // Keep track of task budget let coop = ready!(crate::coop::poll_proceed(cx)); - let ev = ready!(self.shared.poll_readiness(cx, direction)); + // TODO: should this "coop.made_progress()" though? + let ev = ready!(self.shared.poll_readiness(cx, direction))?; if self.handle.inner().is_none() { return Poll::Ready(Err(gone())); diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 248ac5973dd..ad5d99a2b13 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -5,6 +5,7 @@ use crate::util::bit; use crate::util::slab::Entry; use crate::util::WakeList; +use std::io; use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::task::{Context, Poll, Waker}; @@ -233,7 +234,6 @@ impl ScheduledIo { // check for AsyncRead slot if ready.is_readable() || ready.is_error() { - // TODO: figure out what to do for the AsyncRead and AsyncWrite slots if let Some(waker) = waiters.reader.take() { wakers.push(waker); } @@ -249,8 +249,6 @@ impl ScheduledIo { #[cfg(feature = "net")] 'outer: loop { let is_error = ready.is_error(); - // should we wakeup all waiters in case of error? I guess that - // depends on the semantics of EPOLLERR but I don't know yet let mut iter = waiters .list .drain_filter(|w| ready.satisfies(w.interest) || is_error); @@ -305,10 +303,21 @@ impl ScheduledIo { &self, cx: &mut Context<'_>, direction: Direction, - ) -> Poll { + ) -> Poll> { let curr = self.readiness.load(Acquire); - let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); + let ready = Ready::from_usize(READINESS.unpack(curr)); + if ready.is_error() { + // TODO: perhaps we want to do something different if direction == Write, + // because ready.is_error() implies ready.is_write_closed(), which was + // already handled in this function + return Poll::Ready(Err(std::io::Error::new( + io::ErrorKind::Other, + "Polling error", + ))); + } + + let ready = direction.mask() & ready; if ready.is_empty() { // Update the task info @@ -334,25 +343,36 @@ impl ScheduledIo { // Try again, in case the readiness was changed while we were // taking the waiters lock let curr = self.readiness.load(Acquire); - let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); + + let ready = Ready::from_usize(READINESS.unpack(curr)); + if ready.is_error() { + // TODO: think about the relation between `waiters.is_shutdown` + // and this + return Poll::Ready(Err(std::io::Error::new( + io::ErrorKind::Other, + "Polling error", + ))); + } + + let ready = direction.mask() & ready; if waiters.is_shutdown { - Poll::Ready(ReadyEvent { + Poll::Ready(Ok(ReadyEvent { tick: TICK.unpack(curr) as u8, ready: direction.mask(), - }) + })) } else if ready.is_empty() { Poll::Pending } else { - Poll::Ready(ReadyEvent { + Poll::Ready(Ok(ReadyEvent { tick: TICK.unpack(curr) as u8, ready, - }) + })) } } else { - Poll::Ready(ReadyEvent { + Poll::Ready(Ok(ReadyEvent { tick: TICK.unpack(curr) as u8, ready, - }) + })) } } From 2928461edf6b9d6b68f3c836b70c2736b6930d3c Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Sat, 5 Feb 2022 11:24:15 +0000 Subject: [PATCH 3/9] include `ERROR` in `ALL` I also include the code for a couple of manual tests I've introduced to verify my changes work as expected. Ideally we would have good automated unit tests but I haven't found reliable ways to reproduce `EPOLLERR` that do not require `root` --- examples/Cargo.toml | 9 ++++++ examples/fuse.rs | 63 ++++++++++++++++++++++++++++++++++++ examples/fuse2.rs | 62 +++++++++++++++++++++++++++++++++++ tokio/src/io/driver/ready.rs | 2 +- 4 files changed, 135 insertions(+), 1 deletion(-) create mode 100644 examples/fuse.rs create mode 100644 examples/fuse2.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index d2aca69d84a..4d0eeee35e4 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -23,6 +23,7 @@ httparse = "1.0" httpdate = "1.0" once_cell = "1.5.2" rand = "0.8.3" +nix = "0.23" [target.'cfg(windows)'.dev-dependencies.winapi] version = "0.3.8" @@ -91,3 +92,11 @@ path = "named-pipe-ready.rs" [[example]] name = "named-pipe-multi-client" path = "named-pipe-multi-client.rs" + +[[example]] +name = "fuse" +path = "fuse.rs" + +[[example]] +name = "fuse2" +path = "fuse2.rs" diff --git a/examples/fuse.rs b/examples/fuse.rs new file mode 100644 index 00000000000..b03a7ff9ac2 --- /dev/null +++ b/examples/fuse.rs @@ -0,0 +1,63 @@ +// This "test" is based on the code sample in #3442 +// It is meant to test that `readable()` will surface the +// polling error to the caller. +// This program has to be run as `root` to be able to use +// fuse. +// The expected output is a polling error along the lines of: +// ``` +// thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "Polling error" }', examples/fuse.rs:44:14 +// ``` +// This is just to show what testing I'm doing, and I don't intend +// to get it merged +use tokio::io::{unix::AsyncFd, Interest}; + +use nix::{ + fcntl::{fcntl, open, FcntlArg, OFlag}, + mount::{mount, umount, MsFlags}, + sys::stat::Mode, + unistd::read, +}; + +#[tokio::main(flavor = "current_thread")] +async fn main() { + let fuse = open("/dev/fuse", OFlag::O_RDWR, Mode::empty()).unwrap(); + let fcntl_arg = FcntlArg::F_SETFL(OFlag::O_NONBLOCK | OFlag::O_LARGEFILE); + fcntl(fuse, fcntl_arg).unwrap(); + + let options = format!( + "fd={},user_id=0,group_id=0,allow_other,rootmode=40000", + fuse + ); + + mount( + Some("test"), + "/mnt", + Some("fuse"), + MsFlags::empty(), + Some(options.as_str()), + ) + .unwrap(); + + let async_fd = AsyncFd::with_interest(fuse, Interest::READABLE).unwrap(); + + std::thread::spawn(|| { + std::thread::sleep(std::time::Duration::from_secs(2)); + umount("/mnt").unwrap(); + }); + + let mut buffer = [0; 8192]; + loop { + let result = async_fd + .readable() + .await + .unwrap() + .try_io(|_| read(fuse, &mut buffer).map_err(Into::into)); + + match result { + Err(_) => continue, + Ok(result) => { + result.unwrap(); + } + } + } +} diff --git a/examples/fuse2.rs b/examples/fuse2.rs new file mode 100644 index 00000000000..2e663dda0ef --- /dev/null +++ b/examples/fuse2.rs @@ -0,0 +1,62 @@ +// This "test" is based on the code sample in #3442 +// It is meant to test that `poll_read_ready()` will surface the +// polling error to the caller. +// This program has to be run as `root` to be able to use +// fuse. +// The expected output is a polling error along the lines of: +// ``` +// thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Custom { kind: Other, error: "Polling error" }', examples/fuse.rs:44:14 +// ``` +// This is just to show what testing I'm doing, and I don't intend +// to get it merged +use tokio::io::{unix::AsyncFd, Interest}; + +use nix::{ + fcntl::{fcntl, open, FcntlArg, OFlag}, + mount::{mount, umount, MsFlags}, + sys::stat::Mode, + unistd::read, +}; + +#[tokio::main(flavor = "current_thread")] +async fn main() { + let fuse = open("/dev/fuse", OFlag::O_RDWR, Mode::empty()).unwrap(); + let fcntl_arg = FcntlArg::F_SETFL(OFlag::O_NONBLOCK | OFlag::O_LARGEFILE); + fcntl(fuse, fcntl_arg).unwrap(); + + let options = format!( + "fd={},user_id=0,group_id=0,allow_other,rootmode=40000", + fuse + ); + + mount( + Some("test"), + "/mnt", + Some("fuse"), + MsFlags::empty(), + Some(options.as_str()), + ) + .unwrap(); + + let async_fd = AsyncFd::with_interest(fuse, Interest::READABLE).unwrap(); + + std::thread::spawn(|| { + std::thread::sleep(std::time::Duration::from_secs(1)); + umount("/mnt").unwrap(); + }); + + let mut buffer = [0; 8192]; + loop { + let result = futures::future::poll_fn(|cx| async_fd.poll_read_ready(cx)) + .await + .unwrap() + .try_io(|_| read(fuse, &mut buffer).map_err(Into::into)); + + match result { + Err(_) => continue, + Ok(result) => { + result.unwrap(); + } + } + } +} diff --git a/tokio/src/io/driver/ready.rs b/tokio/src/io/driver/ready.rs index aa236aadcd1..9c1d7535dc8 100644 --- a/tokio/src/io/driver/ready.rs +++ b/tokio/src/io/driver/ready.rs @@ -36,7 +36,7 @@ impl Ready { pub const ERROR: Ready = Ready(ERROR); /// Returns a `Ready` representing readiness for all operations. - pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED); + pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED | ERROR); // Must remain crate-private to avoid adding a public dependency on Mio. pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { From 6dbe35c3c5bc6ed3b1b915acbddc04ca86081b40 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Sat, 5 Feb 2022 11:36:56 +0000 Subject: [PATCH 4/9] clean a couple todos --- tokio/src/io/driver/scheduled_io.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index ad5d99a2b13..5cf37ff259f 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -307,10 +307,10 @@ impl ScheduledIo { let curr = self.readiness.load(Acquire); let ready = Ready::from_usize(READINESS.unpack(curr)); - if ready.is_error() { - // TODO: perhaps we want to do something different if direction == Write, - // because ready.is_error() implies ready.is_write_closed(), which was - // already handled in this function + + if direction != Direction::Write && ready.is_error() { + // We only respond to errors for reads because in mio, an error + // also implies `is_write_closed`, and will therefore awake the writer. return Poll::Ready(Err(std::io::Error::new( io::ErrorKind::Other, "Polling error", @@ -345,9 +345,7 @@ impl ScheduledIo { let curr = self.readiness.load(Acquire); let ready = Ready::from_usize(READINESS.unpack(curr)); - if ready.is_error() { - // TODO: think about the relation between `waiters.is_shutdown` - // and this + if direction != Direction::Write && ready.is_error() { return Poll::Ready(Err(std::io::Error::new( io::ErrorKind::Other, "Polling error", @@ -356,6 +354,9 @@ impl ScheduledIo { let ready = direction.mask() & ready; if waiters.is_shutdown { + // TODO: why does this return a `ReadyEvent`? Why + // not make it return an error? It would have to be a + // custom error though... Poll::Ready(Ok(ReadyEvent { tick: TICK.unpack(curr) as u8, ready: direction.mask(), From b61e1761278b73694e8afc95c09b0bed1c8425a2 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Sat, 5 Feb 2022 11:42:50 +0000 Subject: [PATCH 5/9] cleanup poll_readiness a bit Added an early return that simplifies things a bit IMO and simplified an if/else if/else branch below --- tokio/src/io/driver/scheduled_io.rs | 86 ++++++++++++++--------------- 1 file changed, 40 insertions(+), 46 deletions(-) diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 5cf37ff259f..b1c63968817 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -319,61 +319,55 @@ impl ScheduledIo { let ready = direction.mask() & ready; - if ready.is_empty() { - // Update the task info - let mut waiters = self.waiters.lock(); - let slot = match direction { - Direction::Read => &mut waiters.reader, - Direction::Write => &mut waiters.writer, - }; + if !ready.is_empty() { + return Poll::Ready(Ok(ReadyEvent { + tick: TICK.unpack(curr) as u8, + ready, + })); + } - // Avoid cloning the waker if one is already stored that matches the - // current task. - match slot { - Some(existing) => { - if !existing.will_wake(cx.waker()) { - *existing = cx.waker().clone(); - } - } - None => { - *slot = Some(cx.waker().clone()); + // Update the task info + let mut waiters = self.waiters.lock(); + let slot = match direction { + Direction::Read => &mut waiters.reader, + Direction::Write => &mut waiters.writer, + }; + + // Avoid cloning the waker if one is already stored that matches the + // current task. + match slot { + Some(existing) => { + if !existing.will_wake(cx.waker()) { + *existing = cx.waker().clone(); } } + None => { + *slot = Some(cx.waker().clone()); + } + } - // Try again, in case the readiness was changed while we were - // taking the waiters lock - let curr = self.readiness.load(Acquire); + // Try again, in case the readiness was changed while we were + // taking the waiters lock + let curr = self.readiness.load(Acquire); - let ready = Ready::from_usize(READINESS.unpack(curr)); - if direction != Direction::Write && ready.is_error() { - return Poll::Ready(Err(std::io::Error::new( - io::ErrorKind::Other, - "Polling error", - ))); - } + let ready = Ready::from_usize(READINESS.unpack(curr)); + if direction != Direction::Write && ready.is_error() { + return Poll::Ready(Err(std::io::Error::new( + io::ErrorKind::Other, + "Polling error", + ))); + } - let ready = direction.mask() & ready; - if waiters.is_shutdown { - // TODO: why does this return a `ReadyEvent`? Why - // not make it return an error? It would have to be a - // custom error though... - Poll::Ready(Ok(ReadyEvent { - tick: TICK.unpack(curr) as u8, - ready: direction.mask(), - })) - } else if ready.is_empty() { - Poll::Pending - } else { - Poll::Ready(Ok(ReadyEvent { - tick: TICK.unpack(curr) as u8, - ready, - })) - } - } else { + let ready = direction.mask() & ready; + if waiters.is_shutdown || !ready.is_empty() { + // TODO: why return a `ReadyEvent` in shutdown? Why not make it + // return an error? It would have to be a custom error though... Poll::Ready(Ok(ReadyEvent { tick: TICK.unpack(curr) as u8, - ready, + ready: direction.mask(), })) + } else { + Poll::Pending } } From fef2db9863f8b137935aed630e9abde3d8da5680 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Sat, 5 Feb 2022 11:49:33 +0000 Subject: [PATCH 6/9] coop: make polling errors count as work --- tokio/src/io/driver/registration.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index 1e9d61c7d43..4305bb15f9f 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -155,15 +155,16 @@ impl Registration { ) -> Poll> { // Keep track of task budget let coop = ready!(crate::coop::poll_proceed(cx)); - // TODO: should this "coop.made_progress()" though? - let ev = ready!(self.shared.poll_readiness(cx, direction))?; + let ev = ready!(self.shared.poll_readiness(cx, direction)); if self.handle.inner().is_none() { return Poll::Ready(Err(gone())); } + // Regardless of whether we got a polling error or an actual + // `ReadyEvent`, count it as work having been done coop.made_progress(); - Poll::Ready(Ok(ev)) + Poll::Ready(Ok(ev?)) } fn poll_io( From 4eb6df499afbf67e35592593bcaf63c04182fd2e Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Mon, 7 Feb 2022 20:46:33 +0000 Subject: [PATCH 7/9] add one unit test --- tokio/src/io/driver/scheduled_io.rs | 42 ++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index b1c63968817..061745a03e3 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -185,7 +185,6 @@ impl ScheduledIo { // function doesn't see them. let current_readiness = Ready::from_usize(current); let new = f(current_readiness); - let packed = match tick { Tick::Set(t) => TICK.pack(t as usize, new.as_usize()), Tick::Clear(t) => { @@ -586,3 +585,44 @@ cfg_io_readiness! { unsafe impl Send for Readiness<'_> {} unsafe impl Sync for Readiness<'_> {} } + +#[cfg(test)] +mod tests { + + use super::*; + + fn unwrap_ready(p: Poll) -> T { + match p { + Poll::Pending => panic!("Expected a Ready"), + Poll::Ready(t) => t, + } + } + + #[test] + fn test_setting_error_awakes_poll_read_readiness() { + // GIVEN + let scheduled_io: ScheduledIo = Default::default(); + + // WHEN: marking an error and polling readiness + scheduled_io.set_readiness( + None, + Tick::Set(0), + |_| Ready::ERROR + ).unwrap(); + + let mut task = tokio_test::task::spawn(async { + let readiness = crate::future::poll_fn(|cx| { + scheduled_io.poll_readiness(cx, Direction::Read) + }).await; + + readiness + }); + + // THEN + let readiness = unwrap_ready(task.poll()); + assert_eq!( + readiness.unwrap_err().kind(), + std::io::ErrorKind::Other + ); + } +} From 54a2fe476fda87fd3de0cead0ed389a8bfe76d2b Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Sun, 20 Feb 2022 16:33:30 +0000 Subject: [PATCH 8/9] add a couple of integration tests --- tokio/Cargo.toml | 3 ++ tokio/src/io/driver/scheduled_io.rs | 18 ++++------- tokio/tests/io_poll_error.rs | 47 +++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 12 deletions(-) create mode 100644 tokio/tests/io_poll_error.rs diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 601fc2af7f2..4071f9f7d57 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -142,6 +142,9 @@ wasm-bindgen-test = "0.3.0" [target.'cfg(target_os = "freebsd")'.dev-dependencies] mio-aio = { version = "0.6.0", features = ["tokio"] } +[target.'cfg(target_os = "linux")'.dev-dependencies] +userfaultfd = "0.4" + [target.'cfg(loom)'.dev-dependencies] loom = { version = "0.5", features = ["futures", "checkpoint"] } diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 061745a03e3..32e80bb998b 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -604,25 +604,19 @@ mod tests { let scheduled_io: ScheduledIo = Default::default(); // WHEN: marking an error and polling readiness - scheduled_io.set_readiness( - None, - Tick::Set(0), - |_| Ready::ERROR - ).unwrap(); + scheduled_io + .set_readiness(None, Tick::Set(0), |_| Ready::ERROR) + .unwrap(); let mut task = tokio_test::task::spawn(async { - let readiness = crate::future::poll_fn(|cx| { - scheduled_io.poll_readiness(cx, Direction::Read) - }).await; + let readiness = + crate::future::poll_fn(|cx| scheduled_io.poll_readiness(cx, Direction::Read)).await; readiness }); // THEN let readiness = unwrap_ready(task.poll()); - assert_eq!( - readiness.unwrap_err().kind(), - std::io::ErrorKind::Other - ); + assert_eq!(readiness.unwrap_err().kind(), std::io::ErrorKind::Other); } } diff --git a/tokio/tests/io_poll_error.rs b/tokio/tests/io_poll_error.rs new file mode 100644 index 00000000000..95799cbba32 --- /dev/null +++ b/tokio/tests/io_poll_error.rs @@ -0,0 +1,47 @@ +#[cfg(target_os = "linux")] +use tokio::io::{unix::AsyncFd, Interest}; +use userfaultfd::{Uffd, UffdBuilder}; + +fn get_blocking_userfault_fd() -> Uffd { + UffdBuilder::new() + // yes-blocking + .non_blocking(false) + .create() + .unwrap() +} + +#[tokio::test] +async fn test_poll_error_propagates_to_readable() { + // GIVEN: a blocking `userfault` fd. The only reason we use a `userfault` fd + // is that it is a reliable way of getting `EPOLLERR`. + + // From userfault(2) + // > If the `O_NONBLOCK` flag is not enabled, then poll(2) (always) + // indicates the file as having a POLLERR condition, and select(2) indicates + // the file descriptor as both readable and writable. + let async_fd = AsyncFd::with_interest(get_blocking_userfault_fd(), Interest::READABLE).unwrap(); + + // WHEN + let result = async_fd.readable().await; + + // THEN + assert_eq!(result.unwrap_err().kind(), std::io::ErrorKind::Other); +} + +#[tokio::test] +async fn test_poll_error_propagates_to_poll_read_ready() { + // GIVEN: a blocking `userfault` fd. The only reason we use a `userfault` fd + // is that it is a reliable way of getting `EPOLLERR`. + + // From userfault(2) + // > If the `O_NONBLOCK` flag is not enabled, then poll(2) (always) + // indicates the file as having a POLLERR condition, and select(2) indicates + // the file descriptor as both readable and writable. + let async_fd = AsyncFd::with_interest(get_blocking_userfault_fd(), Interest::READABLE).unwrap(); + + // WHEN + let result = futures::future::poll_fn(|cx| async_fd.poll_read_ready(cx)).await; + + // THEN + assert_eq!(result.unwrap_err().kind(), std::io::ErrorKind::Other); +} From 3b026beab896dc18ee38ff2066ec45db08777bea Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Sun, 20 Feb 2022 19:34:26 +0000 Subject: [PATCH 9/9] fix integration test and remove unit tests --- tokio/src/io/driver/scheduled_io.rs | 35 ----------------------------- tokio/tests/io_poll_error.rs | 3 ++- 2 files changed, 2 insertions(+), 36 deletions(-) diff --git a/tokio/src/io/driver/scheduled_io.rs b/tokio/src/io/driver/scheduled_io.rs index 32e80bb998b..db478a3edea 100644 --- a/tokio/src/io/driver/scheduled_io.rs +++ b/tokio/src/io/driver/scheduled_io.rs @@ -585,38 +585,3 @@ cfg_io_readiness! { unsafe impl Send for Readiness<'_> {} unsafe impl Sync for Readiness<'_> {} } - -#[cfg(test)] -mod tests { - - use super::*; - - fn unwrap_ready(p: Poll) -> T { - match p { - Poll::Pending => panic!("Expected a Ready"), - Poll::Ready(t) => t, - } - } - - #[test] - fn test_setting_error_awakes_poll_read_readiness() { - // GIVEN - let scheduled_io: ScheduledIo = Default::default(); - - // WHEN: marking an error and polling readiness - scheduled_io - .set_readiness(None, Tick::Set(0), |_| Ready::ERROR) - .unwrap(); - - let mut task = tokio_test::task::spawn(async { - let readiness = - crate::future::poll_fn(|cx| scheduled_io.poll_readiness(cx, Direction::Read)).await; - - readiness - }); - - // THEN - let readiness = unwrap_ready(task.poll()); - assert_eq!(readiness.unwrap_err().kind(), std::io::ErrorKind::Other); - } -} diff --git a/tokio/tests/io_poll_error.rs b/tokio/tests/io_poll_error.rs index 95799cbba32..25b90edd0a3 100644 --- a/tokio/tests/io_poll_error.rs +++ b/tokio/tests/io_poll_error.rs @@ -1,4 +1,5 @@ -#[cfg(target_os = "linux")] +#![cfg(target_os = "linux")] + use tokio::io::{unix::AsyncFd, Interest}; use userfaultfd::{Uffd, UffdBuilder};