Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: make EPOLLERR awake AsyncFd::readable #4444

Closed
wants to merge 9 commits into from
Closed
24 changes: 24 additions & 0 deletions tokio/src/io/driver/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const READABLE: usize = 0b0_01;
const WRITABLE: usize = 0b0_10;
const READ_CLOSED: usize = 0b0_0100;
const WRITE_CLOSED: usize = 0b0_1000;
const ERROR: usize = 0b1_0000;

/// Describes the readiness state of an I/O resources.
///
Expand All @@ -31,6 +32,9 @@ impl Ready {
/// Returns a `Ready` representing write closed readiness.
pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED);

/// Returns a `Ready` representing error readiness
pub const ERROR: Ready = Ready(ERROR);

/// Returns a `Ready` representing readiness for all operations.
pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);

Expand Down Expand Up @@ -65,6 +69,10 @@ impl Ready {
ready |= Ready::WRITE_CLOSED;
}

if event.is_error() {
ready |= Ready::ERROR;
}

ready
}

Expand Down Expand Up @@ -144,6 +152,21 @@ impl Ready {
self.contains(Ready::WRITE_CLOSED)
}

/// Returns `true` if the value includes write-closed `readiness`.
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_write_closed());
/// assert!(!Ready::WRITABLE.is_write_closed());
/// assert!(Ready::WRITE_CLOSED.is_write_closed());
/// ```
pub fn is_error(self) -> bool {
self.contains(Ready::ERROR)
}

/// Returns true if `self` is a superset of `other`.
///
/// `other` may represent more than one readiness operations, in which case
Expand Down Expand Up @@ -245,6 +268,7 @@ impl fmt::Debug for Ready {
.field("is_writable", &self.is_writable())
.field("is_read_closed", &self.is_read_closed())
.field("is_write_closed", &self.is_write_closed())
.field("is_error", &self.is_error())
.finish()
}
}
2 changes: 1 addition & 1 deletion tokio/src/io/driver/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ cfg_io_readiness! {
)));
}

Pin::new(&mut fut).poll(cx).map(Ok)
Pin::new(&mut fut).poll(cx)
}).await
}

Expand Down
88 changes: 64 additions & 24 deletions tokio/src/io/driver/scheduled_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,13 @@ cfg_io_readiness! {
/// The interest this waiter is waiting on.
interest: Interest,

is_ready: bool,
// TODO: the Readiness object can figure out this information by reading
// the ScheduledIO `readiness` value (and in fact it does in its future
// impl, before putting the waiter in the waitlist.). Why do we need
// this extra field here? Is it because once we have locked the waitlist
// (which we have to do anyway) it's cheaper to read this value than to
// atomically read the `readiness` of the ScheduledIO?
wakeup_type: Option<WakeupType>,

/// Should never be `!Unpin`.
_p: PhantomPinned,
Expand All @@ -81,6 +87,12 @@ cfg_io_readiness! {
Waiting,
Done,
}

#[derive(Debug)]
enum WakeupType {
Ready,
PollError
}
}

// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
Expand Down Expand Up @@ -220,7 +232,8 @@ impl ScheduledIo {
waiters.is_shutdown |= shutdown;

// check for AsyncRead slot
if ready.is_readable() {
if ready.is_readable() || ready.is_error() {
// TODO: figure out what to do for the AsyncRead and AsyncWrite slots
if let Some(waker) = waiters.reader.take() {
wakers.push(waker);
}
Expand All @@ -235,15 +248,23 @@ impl ScheduledIo {

#[cfg(feature = "net")]
'outer: loop {
let mut iter = waiters.list.drain_filter(|w| ready.satisfies(w.interest));

let is_error = ready.is_error();
// should we wakeup all waiters in case of error? I guess that
// depends on the semantics of EPOLLERR but I don't know yet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to wake up all of them on error.

let mut iter = waiters
.list
.drain_filter(|w| ready.satisfies(w.interest) || is_error);
while wakers.can_push() {
match iter.next() {
Some(waiter) => {
let waiter = unsafe { &mut *waiter.as_ptr() };

if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
waiter.wakeup_type = if is_error {
Some(WakeupType::PollError)
} else {
Some(WakeupType::Ready)
};
wakers.push(waker);
}
}
Expand All @@ -263,7 +284,6 @@ impl ScheduledIo {

// Release the lock before notifying
drop(waiters);

wakers.wake_all();
}

Expand Down Expand Up @@ -364,7 +384,7 @@ unsafe impl Sync for ScheduledIo {}
cfg_io_readiness! {
impl ScheduledIo {
/// An async version of `poll_readiness` which uses a linked list of wakers.
pub(crate) async fn readiness(&self, interest: Interest) -> ReadyEvent {
pub(crate) async fn readiness(&self, interest: Interest) -> std::io::Result<ReadyEvent> {
self.readiness_fut(interest).await
}

Expand All @@ -379,7 +399,7 @@ cfg_io_readiness! {
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
is_ready: false,
wakeup_type: None,
interest,
_p: PhantomPinned,
}),
Expand Down Expand Up @@ -407,7 +427,7 @@ cfg_io_readiness! {
// ===== impl Readiness =====

impl Future for Readiness<'_> {
type Output = ReadyEvent;
type Output = std::io::Result<ReadyEvent>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use std::sync::atomic::Ordering::SeqCst;
Expand All @@ -416,14 +436,20 @@ cfg_io_readiness! {
let me = self.get_unchecked_mut();
(&me.scheduled_io, &mut me.state, &me.waiter)
};

loop {
match *state {
State::Init => {
// Optimistically check existing readiness
let curr = scheduled_io.readiness.load(SeqCst);
let ready = Ready::from_usize(READINESS.unpack(curr));

// the information is read via the `curr` in the scheduled io
if ready.is_error() {
return Poll::Ready(Err(
std::io::Error::new(std::io::ErrorKind::Other, "Polling error")
));
}
Comment on lines +460 to +465
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could get a better error for the user here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in trying to find the underlying cause of the polling error? The issue contained some discussion on that. See #4349 (comment) and #4349 (comment)

The way I see it this interface would just return something along the lines of "Polling error" and then the user of this interface (who knows what kind of FD they are working with) would do whatever is needed to recover the underlying error (using SO_ERROR for example, if dealing with a socket).

I wish there was an ErrorKind::PollingError though, but maybe it's just that I don't know enough rust error handling to know how to expose this error to the user of the interface?


// Safety: `waiter.interest` never changes
let interest = unsafe { (*waiter.get()).interest };
let ready = ready.intersection(interest);
Expand All @@ -432,7 +458,7 @@ cfg_io_readiness! {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
return Poll::Ready(ReadyEvent { tick, ready });
return Poll::Ready(Ok(ReadyEvent { tick, ready }));
}

// Wasn't ready, take the lock (and check again while locked).
Expand All @@ -451,7 +477,7 @@ cfg_io_readiness! {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
return Poll::Ready(ReadyEvent { tick, ready });
return Poll::Ready(Ok(ReadyEvent { tick, ready }));
}

// Not ready even after locked, insert into list...
Expand Down Expand Up @@ -480,18 +506,28 @@ cfg_io_readiness! {
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };

if w.is_ready {
// Our waker has been notified.
*state = State::Done;
} else {
// Update the waker, if necessary.
if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
w.waker = Some(cx.waker().clone());
match w.wakeup_type {
Some(WakeupType::Ready) => {
// Our waker has been notified.
*state = State::Done;
},
Some(WakeupType::PollError) => {
return Poll::Ready(Err(
std::io::Error::new(
std::io::ErrorKind::Other,
"Polling error"
)
));
},
None => {
// Update the waker, if necessary.
if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
w.waker = Some(cx.waker().clone());
}

return Poll::Pending;
}

return Poll::Pending;
}

// Explicit drop of the lock to indicate the scope that the
// lock is held. Because holding the lock is required to
// ensure safe access to fields not held within the lock, it
Expand All @@ -503,12 +539,16 @@ cfg_io_readiness! {
let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8;

// Safety: State::Done means it is no longer shared
// TODO: I'm not sure why this safety claim is true. What about
// it being `Done` prevents it from being shared? As far as I can tell
// the `wake0` method in the scheduledio doesn't care about this being
// Done?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's a bit unclear, but I think it's correct. How can it transition to Done while being shared?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me get back to you on that, I have to think through it

let w = unsafe { &mut *waiter.get() };

return Poll::Ready(ReadyEvent {
return Poll::Ready(Ok(ReadyEvent {
tick,
ready: Ready::from_interest(w.interest),
});
}));
}
}
}
Expand Down