Skip to content

Commit

Permalink
Merge pull request async-rs#1 from stjepang/cleanup-sys
Browse files Browse the repository at this point in the history
Cleanup sys module
  • Loading branch information
Stjepan Glavina authored Jul 5, 2020
2 parents 8a270f4 + 768647d commit 7f912b1
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 325 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/cross.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,7 @@ jobs:
- name: iOS
if: startsWith(matrix.os, 'macos')
run: cross build --target aarch64-apple-ios

# - name: illumos
# if: startsWith(matrix.os, 'ubuntu')
# run: cross build --target x86_64-unknown-illumos
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use socket2::{Domain, Protocol, Socket, Type};
use crate::parking::{Reactor, Source};

pub mod parking;
mod sys;

/// Fires at the chosen point in time.
///
Expand Down
208 changes: 161 additions & 47 deletions src/parking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,12 +758,9 @@ mod sys {
use std::convert::TryInto;
use std::io;
use std::os::unix::io::RawFd;
use std::ptr;
use std::time::Duration;

use crate::sys::epoll::{
epoll_create1, epoll_ctl, epoll_wait, EpollEvent, EpollFlags, EpollOp,
};

macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { libc::$fn $args };
Expand All @@ -781,7 +778,42 @@ mod sys {
}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let epoll_fd = epoll_create1()?;
// According to libuv, `EPOLL_CLOEXEC` is not defined on Android API < 21.
// But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform, so we use it instead.
#[cfg(target_os = "android")]
const CLOEXEC: libc::c_int = libc::O_CLOEXEC;
#[cfg(not(target_os = "android"))]
const CLOEXEC: libc::c_int = libc::EPOLL_CLOEXEC;

let epoll_fd = unsafe {
// Check if the `epoll_create1` symbol is available on this platform.
let ptr = libc::dlsym(
libc::RTLD_DEFAULT,
"epoll_create1\0".as_ptr() as *const libc::c_char,
);

if ptr.is_null() {
// If not, use `epoll_create` and manually set `CLOEXEC`.
let fd = match libc::epoll_create(1024) {
-1 => return Err(io::Error::last_os_error()),
fd => fd,
};
let flags = libc::fcntl(fd, libc::F_GETFD);
libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
fd
} else {
// Use `epoll_create1` with `CLOEXEC`.
let epoll_create1 = std::mem::transmute::<
*mut libc::c_void,
unsafe extern "C" fn(libc::c_int) -> libc::c_int,
>(ptr);
match epoll_create1(CLOEXEC) {
-1 => return Err(io::Error::last_os_error()),
fd => fd,
}
}
};

let event_fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
let reactor = Reactor { epoll_fd, event_fd };
reactor.register(event_fd, !0)?;
Expand All @@ -791,8 +823,12 @@ mod sys {
pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> {
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
let ev = &mut EpollEvent::new(0, key as u64);
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlAdd, fd, Some(ev))
let mut ev = libc::epoll_event {
events: 0,
u64: key as u64,
};
syscall!(epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut ev))?;
Ok(())
}
pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> {
let mut flags = libc::EPOLLONESHOT;
Expand All @@ -802,11 +838,21 @@ mod sys {
if write {
flags |= write_flags();
}
let ev = &mut EpollEvent::new(flags, key as u64);
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlMod, fd, Some(ev))
let mut ev = libc::epoll_event {
events: flags as _,
u64: key as u64,
};
syscall!(epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_MOD, fd, &mut ev))?;
Ok(())
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlDel, fd, None)
syscall!(epoll_ctl(
self.epoll_fd,
libc::EPOLL_CTL_DEL,
fd,
ptr::null_mut()
))?;
Ok(())
}
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms = timeout
Expand All @@ -819,7 +865,14 @@ mod sys {
})
.and_then(|t| t.as_millis().try_into().ok())
.unwrap_or(-1);
events.len = epoll_wait(self.epoll_fd, &mut events.list, timeout_ms)?;

let res = syscall!(epoll_wait(
self.epoll_fd,
events.list.as_mut_ptr() as *mut libc::epoll_event,
events.list.len() as libc::c_int,
timeout_ms as libc::c_int,
))?;
events.len = res as usize;

let mut buf = [0u8; 8];
let _ = syscall!(read(
Expand All @@ -841,28 +894,29 @@ mod sys {
Ok(())
}
}
fn read_flags() -> EpollFlags {
fn read_flags() -> libc::c_int {
libc::EPOLLIN | libc::EPOLLRDHUP | libc::EPOLLHUP | libc::EPOLLERR | libc::EPOLLPRI
}
fn write_flags() -> EpollFlags {
fn write_flags() -> libc::c_int {
libc::EPOLLOUT | libc::EPOLLHUP | libc::EPOLLERR
}

pub struct Events {
list: Box<[EpollEvent]>,
list: Box<[libc::epoll_event]>,
len: usize,
}
impl Events {
pub fn new() -> Events {
let list = vec![EpollEvent::empty(); 1000].into_boxed_slice();
let ev = libc::epoll_event { events: 0, u64: 0 };
let list = vec![ev; 1000].into_boxed_slice();
let len = 0;
Events { list, len }
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
readable: (ev.events() & read_flags()) != 0,
writable: (ev.events() & write_flags()) != 0,
key: ev.data() as usize,
readable: (ev.events as libc::c_int & read_flags()) != 0,
writable: (ev.events as libc::c_int & write_flags()) != 0,
key: ev.u64 as usize,
})
}
}
Expand All @@ -886,10 +940,9 @@ mod sys {
use std::io::{self, Read, Write};
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::UnixStream;
use std::ptr;
use std::time::Duration;

use crate::sys::event::{kevent_ts, kqueue, KEvent};

macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { libc::$fn $args };
Expand All @@ -908,7 +961,7 @@ mod sys {
}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let kqueue_fd = kqueue()?;
let kqueue_fd = syscall!(kqueue())?;
syscall!(fcntl(kqueue_fd, libc::F_SETFD, libc::FD_CLOEXEC))?;
let (read_stream, write_stream) = UnixStream::pair()?;
read_stream.set_nonblocking(true)?;
Expand Down Expand Up @@ -939,38 +992,78 @@ mod sys {
} else {
write_flags |= libc::EV_DELETE;
}
let udata = key as _;
let changelist = [
KEvent::new(fd as _, libc::EVFILT_READ, read_flags, 0, 0, udata),
KEvent::new(fd as _, libc::EVFILT_WRITE, write_flags, 0, 0, udata),
libc::kevent {
ident: fd as _,
filter: libc::EVFILT_READ,
flags: read_flags,
fflags: 0,
data: 0,
udata: key as _,
},
libc::kevent {
ident: fd as _,
filter: libc::EVFILT_WRITE,
flags: write_flags,
fflags: 0,
data: 0,
udata: key as _,
},
];
let mut eventlist = changelist;
kevent_ts(self.kqueue_fd, &changelist, &mut eventlist, None)?;
syscall!(kevent(
self.kqueue_fd,
changelist.as_ptr() as *const libc::kevent,
changelist.len() as _,
eventlist.as_mut_ptr() as *mut libc::kevent,
eventlist.len() as _,
ptr::null(),
))?;
for ev in &eventlist {
// Explanation for ignoring EPIPE: https://github.com/tokio-rs/mio/issues/582
let (flags, data) = (ev.flags(), ev.data());
if (flags & libc::EV_ERROR) == 1
&& data != 0
&& data != libc::ENOENT as _
&& data != libc::EPIPE as _
if (ev.flags & libc::EV_ERROR) != 0
&& ev.data != 0
&& ev.data != libc::ENOENT as _
&& ev.data != libc::EPIPE as _
{
return Err(io::Error::from_raw_os_error(data as _));
return Err(io::Error::from_raw_os_error(ev.data as _));
}
}
Ok(())
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
let flags = libc::EV_DELETE | libc::EV_RECEIPT;
let changelist = [
KEvent::new(fd as _, libc::EVFILT_WRITE, flags, 0, 0, 0),
KEvent::new(fd as _, libc::EVFILT_READ, flags, 0, 0, 0),
libc::kevent {
ident: fd as _,
filter: libc::EVFILT_READ,
flags: flags,
fflags: 0,
data: 0,
udata: 0 as _,
},
libc::kevent {
ident: fd as _,
filter: libc::EVFILT_WRITE,
flags: flags,
fflags: 0,
data: 0,
udata: 0 as _,
},
];
let mut eventlist = changelist;
kevent_ts(self.kqueue_fd, &changelist, &mut eventlist, None)?;
syscall!(kevent(
self.kqueue_fd,
changelist.as_ptr() as *const libc::kevent,
changelist.len() as _,
eventlist.as_mut_ptr() as *mut libc::kevent,
eventlist.len() as _,
ptr::null(),
))?;
for ev in &eventlist {
let (flags, data) = (ev.flags(), ev.data());
if (flags & libc::EV_ERROR == 1) && data != 0 && data != libc::ENOENT as _ {
return Err(io::Error::from_raw_os_error(data as _));
if (ev.flags & libc::EV_ERROR) != 0 && ev.data != 0 && ev.data != libc::ENOENT as _
{
return Err(io::Error::from_raw_os_error(ev.data as _));
}
}
Ok(())
Expand All @@ -980,7 +1073,20 @@ mod sys {
tv_sec: t.as_secs() as libc::time_t,
tv_nsec: t.subsec_nanos() as libc::c_long,
});
events.len = kevent_ts(self.kqueue_fd, &[], &mut events.list, timeout)?;
let changelist = [];
let eventlist = &mut events.list;
let res = syscall!(kevent(
self.kqueue_fd,
changelist.as_ptr() as *const libc::kevent,
changelist.len() as _,
eventlist.as_mut_ptr() as *mut libc::kevent,
eventlist.len() as _,
match &timeout {
None => ptr::null(),
Some(t) => t,
}
))?;
events.len = res as usize;

while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
self.reregister(self.read_stream.as_raw_fd(), !0, true, false)?;
Expand All @@ -994,13 +1100,19 @@ mod sys {
}

pub struct Events {
list: Box<[KEvent]>,
list: Box<[libc::kevent]>,
len: usize,
}
impl Events {
pub fn new() -> Events {
let flags = 0;
let event = KEvent::new(0, 0, flags, 0, 0, 0);
let event = libc::kevent {
ident: 0 as _,
filter: 0,
flags: 0,
fflags: 0,
data: 0,
udata: 0 as _,
};
let list = vec![event; 1000].into_boxed_slice();
let len = 0;
Events { list, len }
Expand All @@ -1011,13 +1123,14 @@ mod sys {
//
// https://github.com/golang/go/commit/23aad448b1e3f7c3b4ba2af90120bde91ac865b4
self.list[..self.len].iter().map(|ev| Event {
readable: ev.filter() == libc::EVFILT_READ,
writable: ev.filter() == libc::EVFILT_WRITE
|| (ev.filter() == libc::EVFILT_READ && (ev.flags() & libc::EV_EOF) != 0),
key: ev.udata() as usize,
readable: ev.filter == libc::EVFILT_READ,
writable: ev.filter == libc::EVFILT_WRITE
|| (ev.filter == libc::EVFILT_READ && (ev.flags & libc::EV_EOF) != 0),
key: ev.udata as usize,
})
}
}
unsafe impl Send for Events {}
pub struct Event {
pub readable: bool,
pub writable: bool,
Expand All @@ -1031,6 +1144,7 @@ mod sys {
use std::convert::TryInto;
use std::io;
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::ptr;
use std::time::Duration;

use wepoll_sys_stjepang as we;
Expand Down Expand Up @@ -1115,7 +1229,7 @@ mod sys {
self.handle,
we::EPOLL_CTL_DEL as libc::c_int,
sock as we::SOCKET,
0 as *mut we::epoll_event,
ptr::null_mut(),
))?;
Ok(())
}
Expand Down Expand Up @@ -1148,7 +1262,7 @@ mod sys {
self.handle as winapi::um::winnt::HANDLE,
0,
0,
0 as *mut _,
ptr::null_mut(),
);
}
Ok(())
Expand Down
Loading

0 comments on commit 7f912b1

Please sign in to comment.