Skip to content

Commit

Permalink
feat: Implement futures-0.2 support
Browse files Browse the repository at this point in the history
  • Loading branch information
Markus Westerlind committed Apr 20, 2018
1 parent f2ad0e0 commit 3c86d22
Show file tree
Hide file tree
Showing 4 changed files with 468 additions and 0 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ matrix:

script:
- cargo test
- cargo test --features unstable-futures

env:
global:
Expand Down
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ appveyor = { repository = "alexcrichton/tokio-uds" }
[dependencies]
bytes = "0.4"
futures = "0.1"
futures-core = { version = "=0.2.0-beta", optional = true }
futures-io = { version = "=0.2.0-beta", optional = true }
futures-sink = { version = "=0.2.0-beta", optional = true }
iovec = "0.1"
libc = "0.2"
log = "0.4"
Expand All @@ -26,3 +29,5 @@ mio-uds = "0.6.4"
tokio-reactor = "0.1.1"
tokio-io = "0.1"

[features]
unstable-futures = ["futures-core", "futures-io", "futures-sink", "tokio-reactor/unstable-futures"]
66 changes: 66 additions & 0 deletions src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ use std::path::PathBuf;

use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream};

#[cfg(feature = "unstable-futures")]
use futures2::{self, task};
#[cfg(feature = "unstable-futures")]
use futures_sink;

use UnixDatagram;

/// Encoding of frames via buffers.
Expand Down Expand Up @@ -80,6 +85,20 @@ impl<C: UnixDatagramCodec> Stream for UnixDatagramFramed<C> {
}
}

#[cfg(feature = "unstable-futures")]
impl<C: UnixDatagramCodec> futures2::Stream for UnixDatagramFramed<C> {
type Item = C::In;
type Error = io::Error;

fn poll_next(&mut self, cx: &mut task::Context) -> futures2::Poll<Option<C::In>, io::Error> {
let (n, addr) = try_ready2!(self.socket.recv_from2(cx, &mut self.rd));
trace!("received {} bytes, decoding", n);
let frame = try!(self.codec.decode(&addr, &self.rd[..n]));
trace!("frame decoded from buffer");
Ok(futures2::Async::Ready(Some(frame)))
}
}

impl<C: UnixDatagramCodec> Sink for UnixDatagramFramed<C> {
type SinkItem = C::Out;
type SinkError = io::Error;
Expand Down Expand Up @@ -124,6 +143,53 @@ impl<C: UnixDatagramCodec> Sink for UnixDatagramFramed<C> {
}
}

#[cfg(feature = "unstable-futures")]
impl<C: UnixDatagramCodec> futures_sink::Sink for UnixDatagramFramed<C> {
type SinkItem = C::Out;
type SinkError = io::Error;

fn poll_ready(&mut self, cx: &mut task::Context) -> futures2::Poll<(), io::Error> {
if self.wr.len() > 0 {
try!(self.poll_flush(cx));
if self.wr.len() > 0 {
return Ok(futures2::Async::Pending);
}
}
Ok(().into())
}

fn start_send(&mut self, item: C::Out) -> Result<(), io::Error> {
self.out_addr = try!(self.codec.encode(item, &mut self.wr));
Ok(())
}

fn poll_flush(&mut self, cx: &mut task::Context) -> futures2::Poll<(), io::Error> {
trace!("flushing framed transport");

if self.wr.is_empty() {
return Ok(futures2::Async::Ready(()));
}

trace!("writing; remaining={}", self.wr.len());
let n = try_ready2!(self.socket.send_to2(cx, &self.wr, &self.out_addr));
trace!("written {}", n);
let wrote_all = n == self.wr.len();
self.wr.clear();
if wrote_all {
Ok(futures2::Async::Ready(()))
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"failed to write entire datagram to socket",
))
}
}

fn poll_close(&mut self, cx: &mut task::Context) -> futures2::Poll<(), io::Error> {
self.poll_flush(cx)
}
}

pub fn new<C: UnixDatagramCodec>(socket: UnixDatagram, codec: C) -> UnixDatagramFramed<C> {
UnixDatagramFramed {
socket: socket,
Expand Down
Loading

0 comments on commit 3c86d22

Please sign in to comment.