From 131c301fa8314408b872b75670f9310da8e3ef7a Mon Sep 17 00:00:00 2001 From: Markus Westerlind Date: Tue, 13 Feb 2018 14:01:52 +0100 Subject: [PATCH] Upgrade tokio_core dependency to use tokio 0.1 https://github.com/tokio-rs/tokio-rfcs/pull/2 --- .travis.yml | 1 + Cargo.toml | 58 ++--- src/frame.rs | 81 ++++++- src/lib.rs | 656 ++++++++++++++++++++++++++++++++++----------------- src/ucred.rs | 22 +- 5 files changed, 563 insertions(+), 255 deletions(-) diff --git a/.travis.yml b/.travis.yml index b0f8a66..bf4b396 100644 --- a/.travis.yml +++ b/.travis.yml @@ -18,6 +18,7 @@ matrix: script: - cargo test + - cargo test --features unstable-futures env: global: diff --git a/Cargo.toml b/Cargo.toml index feb2cef..24e4d65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,27 +1,31 @@ -[package] -name = "tokio-uds" -version = "0.1.7" -authors = ["Alex Crichton "] -license = "MIT/Apache-2.0" -repository = "https://github.com/tokio-rs/tokio-uds" -homepage = "https://github.com/tokio-rs/tokio-uds" -documentation = "https://docs.rs/tokio-uds" -description = """ -Unix Domain sockets for Tokio -""" -categories = ["asynchronous"] - -[badges] -travis-ci = { repository = "tokio-rs/tokio-uds" } -appveyor = { repository = "alexcrichton/tokio-uds" } - -[dependencies] -bytes = "0.4" -futures = "0.1.11" -iovec = "0.1" -libc = "0.2" -log = "0.4" -mio = "0.6.5" -mio-uds = "0.6.4" -tokio-core = "0.1" -tokio-io = "0.1" +[package] +name = "tokio-uds" +version = "0.1.7" +authors = ["Alex Crichton "] +license = "MIT/Apache-2.0" +repository = "https://github.com/tokio-rs/tokio-uds" +homepage = "https://github.com/tokio-rs/tokio-uds" +documentation = "https://docs.rs/tokio-uds" +description = """ +Unix Domain sockets for Tokio +""" +categories = ["asynchronous"] + +[badges] +travis-ci = { repository = "tokio-rs/tokio-uds" } +appveyor = { repository = "alexcrichton/tokio-uds" } + +[dependencies] +bytes = "0.4" +futures = "0.1" +futures2 = { version = "0.1.0", optional = true } +iovec = "0.1" +libc = "0.2" +log = "0.4" +mio = "0.6.14" +mio-uds = "0.6.4" +tokio-reactor = { git = "https://github.com/tokio-rs/tokio", features = ["unstable-futures"] } +tokio-io = "0.1" + +[features] +unstable-futures = ["futures2"] diff --git a/src/frame.rs b/src/frame.rs index a1e288a..bac8bc4 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -2,7 +2,10 @@ use std::io; use std::os::unix::net::SocketAddr; use std::path::PathBuf; -use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink}; +use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; + +#[cfg(feature = "unstable-futures")] +use futures2::{self, task}; use UnixDatagram; @@ -50,8 +53,7 @@ pub trait UnixDatagramCodec { /// /// The encode method also determines the destination to which the buffer /// should be directed, which will be returned as a `SocketAddr`. - fn encode(&mut self, msg: Self::Out, buf: &mut Vec) - -> io::Result; + fn encode(&mut self, msg: Self::Out, buf: &mut Vec) -> io::Result; } /// A unified `Stream` and `Sink` interface to an underlying @@ -73,7 +75,7 @@ impl Stream for UnixDatagramFramed { type Error = io::Error; fn poll(&mut self) -> Poll, io::Error> { - let (n, addr) = try_nb!(self.socket.recv_from(&mut self.rd)); + let (n, addr) = try_ready!(self.socket.recv_from(&mut self.rd)); trace!("received {} bytes, decoding", n); let frame = try!(self.codec.decode(&addr, &self.rd[..n])); trace!("frame decoded from buffer"); @@ -81,6 +83,20 @@ impl Stream for UnixDatagramFramed { } } +#[cfg(feature = "unstable-futures")] +impl futures2::Stream for UnixDatagramFramed { + type Item = C::In; + type Error = io::Error; + + fn poll_next(&mut self, cx: &mut task::Context) -> futures2::Poll, io::Error> { + let (n, addr) = try_ready2!(self.socket.recv_from2(cx, &mut self.rd)); + trace!("received {} bytes, decoding", n); + let frame = try!(self.codec.decode(&addr, &self.rd[..n])); + trace!("frame decoded from buffer"); + Ok(futures2::Async::Ready(Some(frame))) + } +} + impl Sink for UnixDatagramFramed { type SinkItem = C::Out; type SinkError = io::Error; @@ -101,19 +117,21 @@ impl Sink for UnixDatagramFramed { trace!("flushing framed transport"); if self.wr.is_empty() { - return Ok(Async::Ready(())) + return Ok(Async::Ready(())); } trace!("writing; remaining={}", self.wr.len()); - let n = try_nb!(self.socket.send_to(&self.wr, &self.out_addr)); + let n = try_ready!(self.socket.send_to(&self.wr, &self.out_addr)); trace!("written {}", n); let wrote_all = n == self.wr.len(); self.wr.clear(); if wrote_all { Ok(Async::Ready(())) } else { - Err(io::Error::new(io::ErrorKind::Other, - "failed to write entire datagram to socket")) + Err(io::Error::new( + io::ErrorKind::Other, + "failed to write entire datagram to socket", + )) } } @@ -123,6 +141,53 @@ impl Sink for UnixDatagramFramed { } } +#[cfg(feature = "unstable-futures")] +impl futures2::Sink for UnixDatagramFramed { + type SinkItem = C::Out; + type SinkError = io::Error; + + fn poll_ready(&mut self, cx: &mut task::Context) -> futures2::Poll<(), io::Error> { + if self.wr.len() > 0 { + try!(self.poll_flush(cx)); + if self.wr.len() > 0 { + return Ok(futures2::Async::Pending); + } + } + Ok(().into()) + } + + fn start_send(&mut self, item: C::Out) -> Result<(), io::Error> { + self.out_addr = try!(self.codec.encode(item, &mut self.wr)); + Ok(()) + } + + fn poll_flush(&mut self, cx: &mut task::Context) -> futures2::Poll<(), io::Error> { + trace!("flushing framed transport"); + + if self.wr.is_empty() { + return Ok(futures2::Async::Ready(())); + } + + trace!("writing; remaining={}", self.wr.len()); + let n = try_ready2!(self.socket.send_to2(cx, &self.wr, &self.out_addr)); + trace!("written {}", n); + let wrote_all = n == self.wr.len(); + self.wr.clear(); + if wrote_all { + Ok(futures2::Async::Ready(())) + } else { + Err(io::Error::new( + io::ErrorKind::Other, + "failed to write entire datagram to socket", + )) + } + } + + fn poll_close(&mut self, cx: &mut task::Context) -> futures2::Poll<(), io::Error> { + self.poll_flush(cx) + } +} + pub fn new(socket: UnixDatagram, codec: C) -> UnixDatagramFramed { UnixDatagramFramed { socket: socket, diff --git a/src/lib.rs b/src/lib.rs index d28a940..c814378 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,15 +14,17 @@ extern crate bytes; #[macro_use] extern crate futures; +#[cfg(feature = "unstable-futures")] +extern crate futures2; extern crate iovec; extern crate libc; #[macro_use] -extern crate tokio_core; -extern crate tokio_io; +extern crate log; extern crate mio; extern crate mio_uds; #[macro_use] -extern crate log; +extern crate tokio_io; +extern crate tokio_reactor; use std::fmt; use std::io::{self, Read, Write}; @@ -33,33 +35,76 @@ use std::os::unix::prelude::*; use std::path::Path; use bytes::{Buf, BufMut}; -use futures::{Future, Poll, Async, Stream}; -use futures::sync::oneshot; +use futures::{Async, Future, Poll, Stream}; +#[cfg(feature = "unstable-futures")] +use futures2::task; +use tokio_io::{AsyncRead, AsyncWrite}; use iovec::IoVec; -use tokio_core::reactor::{PollEvented, Handle}; -#[allow(deprecated)] -use tokio_core::io::Io; -use tokio_io::{IoStream, AsyncRead, AsyncWrite}; +use tokio_reactor::{Handle, PollEvented}; +use mio::Ready; + +#[cfg(feature = "unstable-futures")] +macro_rules! try_ready2 { + ($e: expr) => { + match $e { + Ok($crate::futures2::Async::Ready(t)) => t, + Ok($crate::futures2::Async::Pending) => return Ok($crate::futures2::Async::Pending), + Err(e) => return Err(From::from(e)), + } + }; +} mod frame; -pub use frame::{UnixDatagramFramed, UnixDatagramCodec}; +pub use frame::{UnixDatagramCodec, UnixDatagramFramed}; mod ucred; pub use ucred::UCred; -fn would_block() -> io::Error { - io::Error::new(io::ErrorKind::WouldBlock, "would block") +#[cfg(feature = "unstable-futures")] +fn lift_async(old: futures::Async) -> futures2::Async { + match old { + futures::Async::Ready(x) => futures2::Async::Ready(x), + futures::Async::NotReady => futures2::Async::Pending, + } +} + +/// Stream of listeners +pub struct Incoming { + inner: UnixListener, +} + +impl Stream for Incoming { + type Item = (UnixStream, SocketAddr); + type Error = io::Error; + + fn poll(&mut self) -> Poll, io::Error> { + Ok(Some(try_nb!(self.inner.accept())).into()) + } +} + +#[cfg(feature = "unstable-futures")] +impl futures2::Stream for Incoming { + type Item = (UnixStream, SocketAddr); + type Error = io::Error; + + fn poll_next( + &mut self, + cx: &mut task::Context, + ) -> futures2::Poll, io::Error> { + Ok(Some(try_ready2!(self.inner.poll_accept2(cx))).into()) + } } /// A Unix socket which can accept connections from other unix sockets. pub struct UnixListener { io: PollEvented, - pending_accept: Option>>, + handle: Handle, } impl UnixListener { /// Creates a new `UnixListener` bound to the specified path. - pub fn bind

(path: P, handle: &Handle) -> io::Result - where P: AsRef + pub fn bind

(path: P, handle: Handle) -> io::Result + where + P: AsRef, { UnixListener::_bind(path.as_ref(), handle) } @@ -69,23 +114,21 @@ impl UnixListener { /// /// The returned listener will be associated with the given event loop /// specified by `handle` and is ready to perform I/O. - pub fn from_listener(listener: net::UnixListener, handle: &Handle) - -> io::Result { + pub fn from_listener(listener: net::UnixListener, handle: Handle) -> io::Result { let s = try!(mio_uds::UnixListener::from_listener(listener)); UnixListener::new(s, handle) } - fn _bind(path: &Path, handle: &Handle) -> io::Result { + fn _bind(path: &Path, handle: Handle) -> io::Result { let s = try!(mio_uds::UnixListener::bind(path)); UnixListener::new(s, handle) } - fn new(listener: mio_uds::UnixListener, - handle: &Handle) -> io::Result { - let io = try!(PollEvented::new(listener, handle)); + fn new(listener: mio_uds::UnixListener, handle: Handle) -> io::Result { + let io = try!(PollEvented::new_with_handle(listener, &handle)); Ok(UnixListener { io: io, - pending_accept: None, + handle: handle, }) } @@ -95,8 +138,13 @@ impl UnixListener { } /// Test whether this socket is ready to be read or not. - pub fn poll_read(&self) -> Async<()> { - self.io.poll_read() + #[cfg(feature = "unstable-futures")] + pub fn poll_read_ready2( + &mut self, + cx: &mut task::Context, + ready: Ready, + ) -> futures2::Poll { + self.io.poll_read_ready2(cx, ready) } /// Returns the value of the `SO_ERROR` option. @@ -124,74 +172,68 @@ impl UnixListener { /// implementation of a `Future::poll`, if necessary. pub fn accept(&mut self) -> io::Result<(UnixStream, SocketAddr)> { loop { - if let Some(mut pending) = self.pending_accept.take() { - match pending.poll().expect("shouldn't be canceled") { - Async::NotReady => { - self.pending_accept = Some(pending); - return Err(would_block()) - }, - Async::Ready(r) => return r, - } - } - - if let Async::NotReady = self.io.poll_read() { - return Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")) + if let Async::NotReady = self.io.poll_read_ready(Ready::readable())? { + return Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")); } match try!(self.io.get_ref().accept()) { None => { - self.io.need_read(); - return Err(io::Error::new(io::ErrorKind::WouldBlock, - "not ready")) + self.io.clear_read_ready(Ready::readable())?; + return Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")); } Some((sock, addr)) => { - // Fast path if we haven't left the event loop - if let Some(handle) = self.io.remote().handle() { - let io = try!(PollEvented::new(sock, &handle)); - return Ok((UnixStream { io: io }, addr)) - } - - // If we're off the event loop then send the socket back - // over there to get registered and then we'll get it back - // eventually. - let (tx, rx) = oneshot::channel(); - let remote = self.io.remote().clone(); - remote.spawn(move |handle| { - let res = PollEvented::new(sock, handle) - .map(move |io| { - (UnixStream { io: io }, addr) - }); - drop(tx.send(res)); - Ok(()) - }); - self.pending_accept = Some(rx); - // continue to polling the `rx` at the beginning of the loop + let io = try!(PollEvented::new_with_handle(sock, &self.handle)); + return Ok((UnixStream { io: io }, addr)); } } } } + /// Attempt to accept a connection and create a new connected `UnixStream` + /// if successful. + /// + /// This function will attempt an accept operation, but will not block + /// waiting for it to complete. If the operation would block then a "would + /// block" error is returned. Additionally, if this method would block, it + /// registers the current task to receive a notification when it would + /// otherwise not block. + /// + /// Note that typically for simple usage it's easier to treat incoming + /// connections as a `Stream` of `UnixStream`s with the `incoming` method + /// below. + /// + /// # Panics + /// + /// This function will panic if it is called outside the context of a + /// future's task. It's recommended to only call this from the + /// implementation of a `Future::poll`, if necessary. + #[cfg(feature = "unstable-futures")] + pub fn poll_accept2( + &mut self, + cx: &mut task::Context, + ) -> futures2::Poll<(UnixStream, SocketAddr), io::Error> { + use futures2::Async; + try_ready2!(self.io.poll_read_ready2(cx, Ready::readable())); + + match try!(self.io.get_ref().accept()) { + None => { + self.io.clear_read_ready(Ready::readable())?; + Ok(Async::Pending) + } + Some((sock, addr)) => { + let io = try!(PollEvented::new_with_handle(sock, &self.handle)); + Ok(Async::Ready((UnixStream { io: io }, addr))) + } + } + } /// Consumes this listener, returning a stream of the sockets this listener /// accepts. /// /// This method returns an implementation of the `Stream` trait which /// resolves to the sockets the are accepted on this listener. - pub fn incoming(self) -> IoStream<(UnixStream, SocketAddr)> { - struct Incoming { - inner: UnixListener, - } - - impl Stream for Incoming { - type Item = (UnixStream, SocketAddr); - type Error = io::Error; - - fn poll(&mut self) -> Poll, io::Error> { - Ok(Some(try_nb!(self.inner.accept())).into()) - } - } - - Incoming { inner: self }.boxed() + pub fn incoming(self) -> Incoming { + Incoming { inner: self } } } @@ -220,10 +262,23 @@ impl UnixStream { /// Connects to the socket named by `path`. /// /// This function will create a new unix socket and connect to the path - /// specified, performing associating the returned stream with the provided + /// specified, associating the returned stream with the default event loop's + /// handle. + pub fn connect

(p: P) -> io::Result + where + P: AsRef, + { + UnixStream::connect_handle(p.as_ref(), &Handle::default()) + } + + /// Connects to the socket named by `path`. + /// + /// This function will create a new unix socket and connect to the path + /// specified, associating the returned stream with the provided /// event loop's handle. - pub fn connect

(p: P, handle: &Handle) -> io::Result - where P: AsRef + pub fn connect_handle

(p: P, handle: &Handle) -> io::Result + where + P: AsRef, { UnixStream::_connect(p.as_ref(), handle) } @@ -238,8 +293,7 @@ impl UnixStream { /// /// The returned stream will be associated with the given event loop /// specified by `handle` and is ready to perform I/O. - pub fn from_stream(stream: net::UnixStream, handle: &Handle) - -> io::Result { + pub fn from_stream(stream: net::UnixStream, handle: &Handle) -> io::Result { let s = try!(mio_uds::UnixStream::from_stream(stream)); UnixStream::new(s, handle) } @@ -257,42 +311,38 @@ impl UnixStream { Ok((a, b)) } - fn new(stream: mio_uds::UnixStream, handle: &Handle) - -> io::Result { - let io = try!(PollEvented::new(stream, handle)); + fn new(stream: mio_uds::UnixStream, handle: &Handle) -> io::Result { + let io = try!(PollEvented::new_with_handle(stream, handle)); Ok(UnixStream { io: io }) } - /// Indicates to this source of events that the corresponding I/O object is - /// no longer readable, but it needs to be. - /// - /// # Panics - /// - /// This function will panic if called outside the context of a future's - /// task. - pub fn need_read(&self) { - self.io.need_read() + /// Test whether this socket is ready to be read or not. + pub fn poll_read_ready(&self, ready: Ready) -> Poll { + self.io.poll_read_ready(ready) } - /// Indicates to this source of events that the corresponding I/O object is - /// no longer writable, but it needs to be. - /// - /// # Panics - /// - /// This function will panic if called outside the context of a future's - /// task. - pub fn need_write(&self) { - self.io.need_write() + /// Test whether this socket is ready to be written to or not. + pub fn poll_write_ready(&self) -> Poll { + self.io.poll_write_ready() } /// Test whether this socket is ready to be read or not. - pub fn poll_read(&self) -> Async<()> { - self.io.poll_read() + #[cfg(feature = "unstable-futures")] + pub fn poll_read_ready2( + &mut self, + cx: &mut task::Context, + ready: Ready, + ) -> futures2::Poll { + self.io.poll_read_ready2(cx, ready) } /// Test whether this socket is ready to be written to or not. - pub fn poll_write(&self) -> Async<()> { - self.io.poll_write() + #[cfg(feature = "unstable-futures")] + pub fn poll_write_ready2( + &mut self, + cx: &mut task::Context, + ) -> futures2::Poll { + self.io.poll_write_ready2(cx) } /// Returns the socket address of the local half of this connection. @@ -340,17 +390,6 @@ impl Write for UnixStream { } } -#[allow(deprecated)] -impl Io for UnixStream { - fn poll_read(&mut self) -> Async<()> { - ::poll_read(self) - } - - fn poll_write(&mut self) -> Async<()> { - ::poll_write(self) - } -} - impl AsyncRead for UnixStream { unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool { false @@ -387,14 +426,37 @@ impl<'a> Write for &'a UnixStream { } } -#[allow(deprecated)] -impl<'a> Io for &'a UnixStream { - fn poll_read(&mut self) -> Async<()> { - ::poll_read(self) +#[cfg(feature = "unstable-futures")] +impl futures2::io::AsyncRead for UnixStream { + fn poll_read( + &mut self, + cx: &mut task::Context, + buf: &mut [u8], + ) -> futures2::Poll { + self.io.poll_read(cx, buf) } - fn poll_write(&mut self) -> Async<()> { - ::poll_write(self) + unsafe fn initializer(&self) -> futures2::io::Initializer { + futures2::io::AsyncRead::initializer(&self.io) + } +} + +#[cfg(feature = "unstable-futures")] +impl futures2::io::AsyncWrite for UnixStream { + fn poll_write( + &mut self, + cx: &mut task::Context, + buf: &[u8], + ) -> futures2::Poll { + self.io.poll_write(cx, buf) + } + + fn poll_flush(&mut self, cx: &mut task::Context) -> futures2::Poll<(), io::Error> { + self.io.poll_flush(cx) + } + + fn poll_close(&mut self, cx: &mut task::Context) -> futures2::Poll<(), io::Error> { + self.io.poll_close(cx) } } @@ -419,18 +481,28 @@ unsafe fn read_ready(buf: &mut B, raw_fd: RawFd) -> isize { let b15: &mut [u8] = &mut [0]; let b16: &mut [u8] = &mut [0]; let mut bufs: [&mut IoVec; 16] = [ - b1.into(), b2.into(), b3.into(), b4.into(), - b5.into(), b6.into(), b7.into(), b8.into(), - b9.into(), b10.into(), b11.into(), b12.into(), - b13.into(), b14.into(), b15.into(), b16.into(), + b1.into(), + b2.into(), + b3.into(), + b4.into(), + b5.into(), + b6.into(), + b7.into(), + b8.into(), + b9.into(), + b10.into(), + b11.into(), + b12.into(), + b13.into(), + b14.into(), + b15.into(), + b16.into(), ]; let n = buf.bytes_vec_mut(&mut bufs); let iovecs = iovec::unix::as_os_slice_mut(&mut bufs[..n]); - libc::readv(raw_fd, - iovecs.as_ptr(), - iovecs.len() as i32) + libc::readv(raw_fd, iovecs.as_ptr(), iovecs.len() as i32) } impl<'a> AsyncRead for &'a UnixStream { @@ -439,15 +511,15 @@ impl<'a> AsyncRead for &'a UnixStream { } fn read_buf(&mut self, buf: &mut B) -> Poll { - if let Async::NotReady = ::poll_read(self) { - return Ok(Async::NotReady) + if let Async::NotReady = ::poll_read_ready(self, Ready::readable())? { + return Ok(Async::NotReady); } unsafe { let r = read_ready(buf, self.as_raw_fd()); if r == -1 { let e = io::Error::last_os_error(); if e.kind() == io::ErrorKind::WouldBlock { - self.io.need_read(); + self.io.clear_write_ready()?; Ok(Async::NotReady) } else { Err(e) @@ -468,18 +540,14 @@ unsafe fn write_ready(buf: &mut B, raw_fd: RawFd) -> isize { static DUMMY: &[u8] = &[0]; let iovec = <&IoVec>::from(DUMMY); let mut bufs = [ - iovec, iovec, iovec, iovec, - iovec, iovec, iovec, iovec, - iovec, iovec, iovec, iovec, - iovec, iovec, iovec, iovec, + iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, iovec, + iovec, iovec, iovec, ]; let n = buf.bytes_vec(&mut bufs); let iovecs = iovec::unix::as_os_slice(&bufs[..n]); - libc::writev(raw_fd, - iovecs.as_ptr(), - iovecs.len() as i32) + libc::writev(raw_fd, iovecs.as_ptr(), iovecs.len() as i32) } impl<'a> AsyncWrite for &'a UnixStream { @@ -488,15 +556,15 @@ impl<'a> AsyncWrite for &'a UnixStream { } fn write_buf(&mut self, buf: &mut B) -> Poll { - if let Async::NotReady = ::poll_write(self) { - return Ok(Async::NotReady) + if let Async::NotReady = ::poll_write_ready(self)? { + return Ok(Async::NotReady); } unsafe { let r = write_ready(buf, self.as_raw_fd()); if r == -1 { let e = io::Error::last_os_error(); if e.kind() == io::ErrorKind::WouldBlock { - self.io.need_write(); + self.io.clear_write_ready()?; Ok(Async::NotReady) } else { Err(e) @@ -530,7 +598,8 @@ pub struct UnixDatagram { impl UnixDatagram { /// Creates a new `UnixDatagram` bound to the specified path. pub fn bind

(path: P, handle: &Handle) -> io::Result - where P: AsRef + where + P: AsRef, { UnixDatagram::_bind(path.as_ref(), handle) } @@ -557,15 +626,13 @@ impl UnixDatagram { /// /// The returned datagram will be associated with the given event loop /// specified by `handle` and is ready to perform I/O. - pub fn from_datagram(datagram: net::UnixDatagram, handle: &Handle) - -> io::Result { + pub fn from_datagram(datagram: net::UnixDatagram, handle: &Handle) -> io::Result { let s = try!(mio_uds::UnixDatagram::from_datagram(datagram)); UnixDatagram::new(s, handle) } - fn new(socket: mio_uds::UnixDatagram, handle: &Handle) - -> io::Result { - let io = try!(PollEvented::new(socket, handle)); + fn new(socket: mio_uds::UnixDatagram, handle: &Handle) -> io::Result { + let io = try!(PollEvented::new_with_handle(socket, handle)); Ok(UnixDatagram { io: io }) } @@ -583,36 +650,14 @@ impl UnixDatagram { self.io.get_ref().connect(path) } - /// Indicates to this source of events that the corresponding I/O object is - /// no longer readable, but it needs to be. - /// - /// # Panics - /// - /// This function will panic if called outside the context of a future's - /// task. - pub fn need_read(&self) { - self.io.need_read() - } - - /// Indicates to this source of events that the corresponding I/O object is - /// no longer writable, but it needs to be. - /// - /// # Panics - /// - /// This function will panic if called outside the context of a future's - /// task. - pub fn need_write(&self) { - self.io.need_write() - } - /// Test whether this socket is ready to be read or not. - pub fn poll_read(&self) -> Async<()> { - self.io.poll_read() + pub fn poll_read_ready(&self, ready: Ready) -> Poll { + self.io.poll_read_ready(ready) } /// Test whether this socket is ready to be written to or not. - pub fn poll_write(&self) -> Async<()> { - self.io.poll_write() + pub fn poll_write_ready(&self) -> Poll { + self.io.poll_write_ready() } /// Returns the local address that this socket is bound to. @@ -631,34 +676,72 @@ impl UnixDatagram { /// /// On success, returns the number of bytes read and the address from /// whence the data came. - pub fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - if self.io.poll_read().is_not_ready() { - return Err(would_block()) + pub fn recv_from(&mut self, buf: &mut [u8]) -> Poll<(usize, SocketAddr), io::Error> { + if self.io.poll_read_ready(Ready::readable())?.is_not_ready() { + return Ok(Async::NotReady); } let r = self.io.get_ref().recv_from(buf); if is_wouldblock(&r) { - self.io.need_read(); + self.io.clear_read_ready(Ready::readable())?; } - return r + r.map(Async::Ready) } /// Receives data from the socket. /// /// On success, returns the number of bytes read. - pub fn recv(&self, buf: &mut [u8]) -> io::Result { - if self.io.poll_read().is_not_ready() { - return Err(would_block()) + pub fn recv(&mut self, buf: &mut [u8]) -> Poll { + if self.io.poll_read_ready(Ready::readable())?.is_not_ready() { + return Ok(Async::NotReady); } let r = self.io.get_ref().recv(buf); if is_wouldblock(&r) { - self.io.need_read(); + self.io.clear_read_ready(Ready::readable())?; + } + r.map(Async::Ready) + } + + /// Receives data from the socket. + /// + /// On success, returns the number of bytes read and the address from + /// whence the data came. + #[cfg(feature = "unstable-futures")] + pub fn recv_from2( + &mut self, + cx: &mut task::Context, + buf: &mut [u8], + ) -> futures2::Poll<(usize, SocketAddr), io::Error> { + use futures2::Async; + try_ready2!(self.io.poll_read_ready2(cx, Ready::readable())); + let r = self.io.get_ref().recv_from(buf); + if is_wouldblock(&r) { + self.io.clear_read_ready2(cx, Ready::readable())?; } - return r + r.map(Async::Ready) + } + + /// Receives data from the socket. + /// + /// On success, returns the number of bytes read. + #[cfg(feature = "unstable-futures")] + pub fn recv2( + &mut self, + cx: &mut task::Context, + buf: &mut [u8], + ) -> futures2::Poll { + use futures2::Async; + try_ready2!(self.io.poll_read_ready2(cx, Ready::readable())); + let r = self.io.get_ref().recv(buf); + if is_wouldblock(&r) { + self.io.clear_read_ready2(cx, Ready::readable())?; + } + r.map(Async::Ready) } /// Returns a future for receiving a datagram. See the documentation on RecvDgram for details. pub fn recv_dgram(self, buf: T) -> RecvDgram - where T: AsMut<[u8]> + where + T: AsMut<[u8]>, { RecvDgram { st: RecvDgramState::Receiving { @@ -671,17 +754,18 @@ impl UnixDatagram { /// Sends data on the socket to the specified address. /// /// On success, returns the number of bytes written. - pub fn send_to

(&self, buf: &[u8], path: P) -> io::Result - where P: AsRef + pub fn send_to

(&mut self, buf: &[u8], path: P) -> Poll + where + P: AsRef, { - if self.io.poll_write().is_not_ready() { - return Err(would_block()) + if self.io.poll_write_ready()?.is_not_ready() { + return Ok(Async::NotReady); } let r = self.io.get_ref().send_to(buf, path); if is_wouldblock(&r) { - self.io.need_write(); + self.io.clear_write_ready()?; } - return r + r.map(Async::Ready) } /// Sends data on the socket to the socket's peer. @@ -690,22 +774,65 @@ impl UnixDatagram { /// will return an error if the socket has not already been connected. /// /// On success, returns the number of bytes written. - pub fn send(&self, buf: &[u8]) -> io::Result { - if self.io.poll_write().is_not_ready() { - return Err(would_block()) + pub fn send(&mut self, buf: &[u8]) -> Poll { + if self.io.poll_write_ready()?.is_not_ready() { + return Ok(Async::NotReady); } let r = self.io.get_ref().send(buf); if is_wouldblock(&r) { - self.io.need_write(); + self.io.clear_write_ready()?; + } + r.map(Async::Ready) + } + + /// Sends data on the socket to the specified address. + /// + /// On success, returns the number of bytes written. + #[cfg(feature = "unstable-futures")] + pub fn send_to2

( + &mut self, + cx: &mut task::Context, + buf: &[u8], + path: P, + ) -> futures2::Poll + where + P: AsRef, + { + use futures2::Async; + try_ready2!(self.io.poll_write_ready2(cx)); + let r = self.io.get_ref().send_to(buf, path); + if is_wouldblock(&r) { + self.io.clear_write_ready2(cx)?; } - return r + r.map(Async::Ready) } + /// Sends data on the socket to the socket's peer. + /// + /// The peer address may be set by the `connect` method, and this method + /// will return an error if the socket has not already been connected. + /// + /// On success, returns the number of bytes written. + #[cfg(feature = "unstable-futures")] + pub fn send2( + &mut self, + cx: &mut task::Context, + buf: &[u8], + ) -> futures2::Poll { + use futures2::Async; + try_ready2!(self.io.poll_write_ready2(cx)); + let r = self.io.get_ref().send(buf); + if is_wouldblock(&r) { + self.io.clear_write_ready2(cx)?; + } + r.map(Async::Ready) + } /// Returns a future sending the data in buf to the socket at path. pub fn send_dgram(self, buf: T, path: P) -> SendDgram - where T: AsRef<[u8]>, - P: AsRef + where + T: AsRef<[u8]>, + P: AsRef, { SendDgram { st: SendDgramState::Sending { @@ -750,7 +877,8 @@ impl UnixDatagram { /// which will break them into separate objects, allowing them to interact /// more easily. pub fn framed(self, codec: C) -> UnixDatagramFramed - where C: UnixDatagramCodec, + where + C: UnixDatagramCodec, { frame::new(self, codec) } @@ -795,8 +923,9 @@ enum SendDgramState { } impl Future for SendDgram - where T: AsRef<[u8]>, - P: AsRef +where + T: AsRef<[u8]>, + P: AsRef, { /// Returns the underlying socket and the buffer that was sent. type Item = (UnixDatagram, T); @@ -804,17 +933,64 @@ impl Future for SendDgram type Error = io::Error; fn poll(&mut self) -> Poll { - if let SendDgramState::Sending { ref sock, ref buf, ref addr } = self.st { - let n = try_nb!(sock.send_to(buf.as_ref(), addr)); + if let SendDgramState::Sending { + ref mut sock, + ref buf, + ref addr, + } = self.st + { + let n = try_ready!(sock.send_to(buf.as_ref(), addr)); + if n < buf.as_ref().len() { + return Err(io::Error::new( + io::ErrorKind::Other, + "Couldn't send whole buffer".to_string(), + )); + } + } else { + panic!() + } + if let SendDgramState::Sending { sock, buf, addr: _ } = + mem::replace(&mut self.st, SendDgramState::Empty) + { + Ok(Async::Ready((sock, buf))) + } else { + panic!() + } + } +} + +#[cfg(feature = "unstable-futures")] +impl futures2::Future for SendDgram +where + T: AsRef<[u8]>, + P: AsRef, +{ + /// Returns the underlying socket and the buffer that was sent. + type Item = (UnixDatagram, T); + /// The error that is returned when sending failed. + type Error = io::Error; + + fn poll(&mut self, cx: &mut task::Context) -> futures2::Poll { + use futures2::Async; + if let SendDgramState::Sending { + ref mut sock, + ref buf, + ref addr, + } = self.st + { + let n = try_ready2!(sock.send_to2(cx, buf.as_ref(), addr)); if n < buf.as_ref().len() { - return Err(io::Error::new(io::ErrorKind::Other, - "Couldn't send whole buffer".to_string())); + return Err(io::Error::new( + io::ErrorKind::Other, + "Couldn't send whole buffer".to_string(), + )); } } else { panic!() } if let SendDgramState::Sending { sock, buf, addr: _ } = - mem::replace(&mut self.st, SendDgramState::Empty) { + mem::replace(&mut self.st, SendDgramState::Empty) + { Ok(Async::Ready((sock, buf))) } else { panic!() @@ -836,12 +1012,16 @@ pub struct RecvDgram { /// avoided. enum RecvDgramState { #[allow(dead_code)] - Receiving { sock: UnixDatagram, buf: T }, + Receiving { + sock: UnixDatagram, + buf: T, + }, Empty, } impl Future for RecvDgram - where T: AsMut<[u8]> +where + T: AsMut<[u8]>, { /// RecvDgram yields a tuple of the underlying socket, the receive buffer, how many bytes were /// received, and the address (path) of the peer sending the datagram. If the buffer is too small, the @@ -854,19 +1034,67 @@ impl Future for RecvDgram let received; let peer; - if let RecvDgramState::Receiving { ref sock, ref mut buf } = self.st { - let (n, p) = try_nb!(sock.recv_from(buf.as_mut())); + if let RecvDgramState::Receiving { + ref mut sock, + ref mut buf, + } = self.st + { + let (n, p) = try_ready!(sock.recv_from(buf.as_mut())); received = n; - peer = p.as_pathname().map_or(String::new(), - |p| p.to_str().map_or(String::new(), |s| s.to_string())); + peer = p.as_pathname().map_or(String::new(), |p| { + p.to_str().map_or(String::new(), |s| s.to_string()) + }); + } else { + panic!() + } + + if let RecvDgramState::Receiving { sock, buf } = + mem::replace(&mut self.st, RecvDgramState::Empty) + { + Ok(Async::Ready((sock, buf, received, peer))) + } else { + panic!() + } + } +} + +#[cfg(feature = "unstable-futures")] +impl futures2::Future for RecvDgram +where + T: AsMut<[u8]>, +{ + /// RecvDgram yields a tuple of the underlying socket, the receive buffer, how many bytes were + /// received, and the address (path) of the peer sending the datagram. If the buffer is too small, the + /// datagram is truncated. + type Item = (UnixDatagram, T, usize, String); + /// This future yields io::Error if an error occurred. + type Error = io::Error; + + fn poll(&mut self, _cx: &mut task::Context) -> futures2::Poll { + use futures2::Async; + + let received; + let peer; + + if let RecvDgramState::Receiving { + ref mut sock, + ref mut buf, + } = self.st + { + let (n, p) = try_ready2!(sock.recv_from(buf.as_mut()).map(::lift_async)); + received = n; + peer = p.as_pathname().map_or(String::new(), |p| { + p.to_str().map_or(String::new(), |s| s.to_string()) + }); } else { panic!() } - if let RecvDgramState::Receiving { sock, buf } = mem::replace(&mut self.st, - RecvDgramState::Empty) { + if let RecvDgramState::Receiving { sock, buf } = + mem::replace(&mut self.st, RecvDgramState::Empty) + { Ok(Async::Ready((sock, buf, received, peer))) } else { panic!() diff --git a/src/ucred.rs b/src/ucred.rs index 192c10b..17eda0b 100644 --- a/src/ucred.rs +++ b/src/ucred.rs @@ -1,4 +1,4 @@ -use libc::{uid_t, gid_t}; +use libc::{gid_t, uid_t}; /// Credentials of a process #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] @@ -28,7 +28,11 @@ pub mod impl_linux { unsafe { let raw_fd = sock.as_raw_fd(); - let mut ucred = ucred { pid: 0, uid: 0, gid: 0 }; + let mut ucred = ucred { + pid: 0, + uid: 0, + gid: 0, + }; let ucred_size = mem::size_of::(); @@ -37,8 +41,14 @@ pub mod impl_linux { assert!(ucred_size <= u32::max_value() as usize); let mut ucred_size = ucred_size as socklen_t; - - let ret = getsockopt(raw_fd, SOL_SOCKET, SO_PEERCRED, &mut ucred as *mut ucred as *mut c_void, &mut ucred_size); + + let ret = getsockopt( + raw_fd, + SOL_SOCKET, + SO_PEERCRED, + &mut ucred as *mut ucred as *mut c_void, + &mut ucred_size, + ); if ret == 0 && ucred_size as usize == mem::size_of::() { Ok(super::UCred { uid: ucred.uid, @@ -79,14 +89,14 @@ pub mod impl_macos { #[cfg(not(target_os = "dragonfly"))] #[cfg(test)] mod test { - use tokio_core::reactor::Core; + use tokio_reactor::Reactor; use UnixStream; use libc::geteuid; use libc::getegid; #[test] fn test_socket_pair() { - let core = Core::new().unwrap(); + let core = Reactor::new().unwrap(); let handle = core.handle(); let (a, b) = UnixStream::pair(&handle).unwrap();