Skip to content

Commit

Permalink
Upgrade to mio 0.8 (#326)
Browse files Browse the repository at this point in the history
This change upgrades `mio` for non-MacOS / non-Android unix platforms.
`mio` is used an abstraction layer over the various methods of doing
`epoll`, etc on Unix platforms.

There are a few notable changes this upgrade deals with:

- `mio` no longer supports level-triggered events. What this means is
  that instead of always delivering readable events for file
  descriptors when there is data left to read, an event is only
  delivered the first time new data becomes available. The consumer is
  expected to try to read from descriptor until it would block. This
  means we have to put a loop around calls to recv for each file
  descriptor. Note that this might change the order that messages
  arrive, since before each polling operation would only give one
  message per fd. Now all available messages are delivered per call to
  poll.
- The `mio` API has changed a bit. Now there's a poll registry and also
  `Ready` has been replaced by `Interest`.
  • Loading branch information
mrobinson authored Sep 10, 2023
1 parent 0b650d4 commit e0af749
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 35 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ tempfile = "3.4"
uuid = { version = "1", features = ["v4"] }

[target.'cfg(any(target_os = "linux", target_os = "openbsd", target_os = "freebsd"))'.dependencies]
mio = "0.6.11"
mio = { version = "0.8", features = ["os-ext"] }
sc = { version = "0.2.2", optional = true }

[dev-dependencies]
Expand Down
78 changes: 44 additions & 34 deletions src/platform/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use libc::{c_char, c_int, c_void, getsockopt, SO_LINGER, S_IFMT, S_IFSOCK};
use libc::{iovec, mode_t, msghdr, off_t, recvmsg, sendmsg};
use libc::{sa_family_t, setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t};
use libc::{EAGAIN, EWOULDBLOCK};
use mio::unix::EventedFd;
use mio::{Events, Poll, PollOpt, Ready, Token};
use mio::unix::SourceFd;
use mio::{Events, Interest, Poll, Token};
use std::cell::Cell;
use std::cmp;
use std::collections::HashMap;
Expand All @@ -29,6 +29,7 @@ use std::io;
use std::marker::PhantomData;
use std::mem;
use std::ops::{Deref, RangeFrom};
use std::os::fd::RawFd;
use std::ptr;
use std::slice;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -116,7 +117,7 @@ pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), UnixError> {
#[derive(Clone, Copy)]
struct PollEntry {
pub id: u64,
pub fd: c_int,
pub fd: RawFd,
}

#[derive(PartialEq, Debug)]
Expand Down Expand Up @@ -508,35 +509,46 @@ impl OsIpcReceiverSet {
pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64, UnixError> {
let last_index = self.incrementor.next().unwrap();
let fd = receiver.consume_fd();
let io = EventedFd(&fd);
let fd_token = Token(fd as usize);
let poll_entry = PollEntry { id: last_index, fd };
self.poll
.register(&io, fd_token, Ready::readable(), PollOpt::level())?;
.registry()
.register(&mut SourceFd(&fd), fd_token, Interest::READABLE)?;
self.pollfds.insert(fd_token, poll_entry);
Ok(last_index)
}

pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>, UnixError> {
let mut selection_results = Vec::new();
let mut num_events = 0;
while num_events == 0 {
// Poll until we receive at least one event.
loop {
match self.poll.poll(&mut self.events, None) {
Ok(sz) => {
num_events = sz;
},
Err(ref e) => {
if e.kind() != io::ErrorKind::Interrupted {
Ok(()) if !self.events.is_empty() => break,
Ok(()) => {},
Err(ref error) => {
if error.kind() != io::ErrorKind::Interrupted {
return Err(UnixError::last());
}
},
}

if !self.events.is_empty() {
break;
}
}

for evt in self.events.iter() {
let evt_token = evt.token();
match (evt.readiness().is_readable(), self.pollfds.get(&evt_token)) {
(true, Some(&poll_entry)) => match recv(poll_entry.fd, BlockingMode::Blocking) {
let mut selection_results = Vec::new();
for event in self.events.iter() {
// We only register this `Poll` for readable events.
assert!(event.is_readable());

let event_token = event.token();
let poll_entry = self
.pollfds
.get(&event_token)
.expect("Got event for unknown token.")
.clone();
loop {
match recv(poll_entry.fd, BlockingMode::Nonblocking) {
Ok((data, channels, shared_memory_regions)) => {
selection_results.push(OsIpcSelectionResult::DataReceived(
poll_entry.id,
Expand All @@ -546,28 +558,26 @@ impl OsIpcReceiverSet {
));
},
Err(err) if err.channel_is_closed() => {
self.pollfds.remove(&evt_token).unwrap();
self.poll.deregister(&EventedFd(&poll_entry.fd)).unwrap();
self.pollfds.remove(&event_token).unwrap();
self.poll
.registry()
.deregister(&mut SourceFd(&poll_entry.fd))
.unwrap();
unsafe {
libc::close(poll_entry.fd);
}
selection_results.push(OsIpcSelectionResult::ChannelClosed(poll_entry.id))

selection_results.push(OsIpcSelectionResult::ChannelClosed(poll_entry.id));
break;
},
Err(UnixError::Errno(code)) if code == EWOULDBLOCK => {
// We tried to read another message from the file descriptor and
// it would have blocked, so we have exhausted all of the data
// pending to read.
break;
},
Err(err) => return Err(err),
},
(true, None) => {
panic!(
"Readable event for unknown token: {:?}, readiness: {:?}",
evt_token,
evt.readiness()
);
},
(false, _) => {
panic!(
"Received an event that was not readable for token: {:?}",
evt_token
)
},
}
}
}

Expand Down

0 comments on commit e0af749

Please sign in to comment.