Skip to content

Commit

Permalink
io: add Ready::ERROR and report error readiness (#5781)
Browse files Browse the repository at this point in the history
Add `Ready::ERROR` enabling callers to specify interest in error readiness. Some platforms use error
readiness notifications to notify of other events. For example, Linux uses error to notify receipt of
messages on a UDP socket's error queue.

Using error readiness is platform specific.

Closes #5716
  • Loading branch information
folkertdev authored Aug 16, 2023
1 parent 6e42c26 commit 10e141d
Show file tree
Hide file tree
Showing 3 changed files with 325 additions and 19 deletions.
184 changes: 168 additions & 16 deletions tokio/src/io/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,70 @@ use crate::io::ready::Ready;
use std::fmt;
use std::ops;

// These must be unique.
// same as mio
const READABLE: usize = 0b0001;
const WRITABLE: usize = 0b0010;
// The following are not available on all platforms.
#[cfg(target_os = "freebsd")]
const AIO: usize = 0b0100;
#[cfg(target_os = "freebsd")]
const LIO: usize = 0b1000;
#[cfg(any(target_os = "linux", target_os = "android"))]
const PRIORITY: usize = 0b0001_0000;
// error is available on all platforms, but behavior is platform-specific
// mio does not have this interest
const ERROR: usize = 0b0010_0000;

/// Readiness event interest.
///
/// Specifies the readiness events the caller is interested in when awaiting on
/// I/O resource readiness states.
#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct Interest(mio::Interest);
pub struct Interest(usize);

impl Interest {
// The non-FreeBSD definitions in this block are active only when
// building documentation.
cfg_aio! {
/// Interest for POSIX AIO.
#[cfg(target_os = "freebsd")]
pub const AIO: Interest = Interest(mio::Interest::AIO);
pub const AIO: Interest = Interest(AIO);

/// Interest for POSIX AIO.
#[cfg(not(target_os = "freebsd"))]
pub const AIO: Interest = Interest(mio::Interest::READABLE);
pub const AIO: Interest = Interest(READABLE);

/// Interest for POSIX AIO lio_listio events.
#[cfg(target_os = "freebsd")]
pub const LIO: Interest = Interest(mio::Interest::LIO);
pub const LIO: Interest = Interest(LIO);

/// Interest for POSIX AIO lio_listio events.
#[cfg(not(target_os = "freebsd"))]
pub const LIO: Interest = Interest(mio::Interest::READABLE);
pub const LIO: Interest = Interest(READABLE);
}

/// Interest in all readable events.
///
/// Readable interest includes read-closed events.
pub const READABLE: Interest = Interest(mio::Interest::READABLE);
pub const READABLE: Interest = Interest(READABLE);

/// Interest in all writable events.
///
/// Writable interest includes write-closed events.
pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
pub const WRITABLE: Interest = Interest(WRITABLE);

/// Interest in error events.
///
/// Passes error interest to the underlying OS selector.
/// Behavior is platform-specific, read your platform's documentation.
pub const ERROR: Interest = Interest(ERROR);

/// Returns a `Interest` set representing priority completion interests.
#[cfg(any(target_os = "linux", target_os = "android"))]
#[cfg_attr(docsrs, doc(cfg(any(target_os = "linux", target_os = "android"))))]
pub const PRIORITY: Interest = Interest(mio::Interest::PRIORITY);
pub const PRIORITY: Interest = Interest(PRIORITY);

/// Returns true if the value includes readable interest.
///
Expand All @@ -63,7 +84,7 @@ impl Interest {
/// assert!(both.is_readable());
/// ```
pub const fn is_readable(self) -> bool {
self.0.is_readable()
self.0 & READABLE != 0
}

/// Returns true if the value includes writable interest.
Expand All @@ -80,7 +101,34 @@ impl Interest {
/// assert!(both.is_writable());
/// ```
pub const fn is_writable(self) -> bool {
self.0.is_writable()
self.0 & WRITABLE != 0
}

/// Returns true if the value includes error interest.
///
/// # Examples
///
/// ```
/// use tokio::io::Interest;
///
/// assert!(Interest::ERROR.is_error());
/// assert!(!Interest::WRITABLE.is_error());
///
/// let combined = Interest::READABLE | Interest::ERROR;
/// assert!(combined.is_error());
/// ```
pub const fn is_error(self) -> bool {
self.0 & ERROR != 0
}

#[cfg(target_os = "freebsd")]
const fn is_aio(self) -> bool {
self.0 & AIO != 0
}

#[cfg(target_os = "freebsd")]
const fn is_lio(self) -> bool {
self.0 & LIO != 0
}

/// Returns true if the value includes priority interest.
Expand All @@ -99,7 +147,7 @@ impl Interest {
#[cfg(any(target_os = "linux", target_os = "android"))]
#[cfg_attr(docsrs, doc(cfg(any(target_os = "linux", target_os = "android"))))]
pub const fn is_priority(self) -> bool {
self.0.is_priority()
self.0 & PRIORITY != 0
}

/// Add together two `Interest` values.
Expand All @@ -116,12 +164,60 @@ impl Interest {
/// assert!(BOTH.is_readable());
/// assert!(BOTH.is_writable());
pub const fn add(self, other: Interest) -> Interest {
Interest(self.0.add(other.0))
Self(self.0 | other.0)
}

// This function must be crate-private to avoid exposing a `mio` dependency.
pub(crate) const fn to_mio(self) -> mio::Interest {
self.0
pub(crate) fn to_mio(self) -> mio::Interest {
fn mio_add(wrapped: &mut Option<mio::Interest>, add: mio::Interest) {
match wrapped {
Some(inner) => *inner |= add,
None => *wrapped = Some(add),
}
}

// mio does not allow and empty interest, so use None for empty
let mut mio = None;

if self.is_readable() {
mio_add(&mut mio, mio::Interest::READABLE);
}

if self.is_writable() {
mio_add(&mut mio, mio::Interest::WRITABLE);
}

#[cfg(any(target_os = "linux", target_os = "android"))]
if self.is_priority() {
mio_add(&mut mio, mio::Interest::PRIORITY);
}

#[cfg(target_os = "freebsd")]
if self.is_aio() {
mio_add(&mut mio, mio::Interest::AIO);
}

#[cfg(target_os = "freebsd")]
if self.is_lio() {
mio_add(&mut mio, mio::Interest::LIO);
}

if self.is_error() {
// There is no error interest in mio, because error events are always reported.
// But mio interests cannot be empty and an interest is needed just for the registeration.
//
// read readiness is filtered out in `Interest::mask` or `Ready::from_interest` if
// the read interest was not specified by the user.
mio_add(&mut mio, mio::Interest::READABLE);
}

// the default `mio::Interest::READABLE` should never be used in practice. Either
//
// - at least one tokio interest with a mio counterpart was used
// - only the error tokio interest was specified
//
// in both cases, `mio` is Some already
mio.unwrap_or(mio::Interest::READABLE)
}

pub(crate) fn mask(self) -> Ready {
Expand All @@ -130,6 +226,7 @@ impl Interest {
Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED,
#[cfg(any(target_os = "linux", target_os = "android"))]
Interest::PRIORITY => Ready::PRIORITY | Ready::READ_CLOSED,
Interest::ERROR => Ready::ERROR,
_ => Ready::EMPTY,
}
}
Expand All @@ -147,12 +244,67 @@ impl ops::BitOr for Interest {
impl ops::BitOrAssign for Interest {
#[inline]
fn bitor_assign(&mut self, other: Self) {
self.0 = (*self | other).0;
*self = *self | other
}
}

impl fmt::Debug for Interest {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(fmt)
let mut separator = false;

if self.is_readable() {
if separator {
write!(fmt, " | ")?;
}
write!(fmt, "READABLE")?;
separator = true;
}

if self.is_writable() {
if separator {
write!(fmt, " | ")?;
}
write!(fmt, "WRITABLE")?;
separator = true;
}

#[cfg(any(target_os = "linux", target_os = "android"))]
if self.is_priority() {
if separator {
write!(fmt, " | ")?;
}
write!(fmt, "PRIORITY")?;
separator = true;
}

#[cfg(target_os = "freebsd")]
if self.is_aio() {
if separator {
write!(fmt, " | ")?;
}
write!(fmt, "AIO")?;
separator = true;
}

#[cfg(target_os = "freebsd")]
if self.is_lio() {
if separator {
write!(fmt, " | ")?;
}
write!(fmt, "LIO")?;
separator = true;
}

if self.is_error() {
if separator {
write!(fmt, " | ")?;
}
write!(fmt, "ERROR")?;
separator = true;
}

let _ = separator;

Ok(())
}
}
35 changes: 32 additions & 3 deletions tokio/src/io/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const READ_CLOSED: usize = 0b0_0100;
const WRITE_CLOSED: usize = 0b0_1000;
#[cfg(any(target_os = "linux", target_os = "android"))]
const PRIORITY: usize = 0b1_0000;
const ERROR: usize = 0b10_0000;

/// Describes the readiness state of an I/O resources.
///
Expand Down Expand Up @@ -40,13 +41,17 @@ impl Ready {
#[cfg_attr(docsrs, doc(cfg(any(target_os = "linux", target_os = "android"))))]
pub const PRIORITY: Ready = Ready(PRIORITY);

/// Returns a `Ready` representing error readiness.
pub const ERROR: Ready = Ready(ERROR);

/// Returns a `Ready` representing readiness for all operations.
#[cfg(any(target_os = "linux", target_os = "android"))]
pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED | PRIORITY);
pub const ALL: Ready =
Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED | ERROR | PRIORITY);

/// Returns a `Ready` representing readiness for all operations.
#[cfg(not(any(target_os = "linux", target_os = "android")))]
pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);
pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED | ERROR);

// Must remain crate-private to avoid adding a public dependency on Mio.
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
Expand Down Expand Up @@ -79,6 +84,10 @@ impl Ready {
ready |= Ready::WRITE_CLOSED;
}

if event.is_error() {
ready |= Ready::ERROR;
}

#[cfg(any(target_os = "linux", target_os = "android"))]
{
if event.is_priority() {
Expand Down Expand Up @@ -182,6 +191,21 @@ impl Ready {
self.contains(Ready::PRIORITY)
}

/// Returns `true` if the value includes error `readiness`.
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_error());
/// assert!(!Ready::WRITABLE.is_error());
/// assert!(Ready::ERROR.is_error());
/// ```
pub fn is_error(self) -> bool {
self.contains(Ready::ERROR)
}

/// Returns true if `self` is a superset of `other`.
///
/// `other` may represent more than one readiness operations, in which case
Expand Down Expand Up @@ -230,6 +254,10 @@ impl Ready {
ready |= Ready::READ_CLOSED;
}

if interest.is_error() {
ready |= Ready::ERROR;
}

ready
}

Expand Down Expand Up @@ -283,7 +311,8 @@ impl fmt::Debug for Ready {
fmt.field("is_readable", &self.is_readable())
.field("is_writable", &self.is_writable())
.field("is_read_closed", &self.is_read_closed())
.field("is_write_closed", &self.is_write_closed());
.field("is_write_closed", &self.is_write_closed())
.field("is_error", &self.is_error());

#[cfg(any(target_os = "linux", target_os = "android"))]
fmt.field("is_priority", &self.is_priority());
Expand Down
Loading

0 comments on commit 10e141d

Please sign in to comment.