Skip to content

Commit

Permalink
Remvoe box from pipeline, similar tokio
Browse files Browse the repository at this point in the history
  • Loading branch information
alexcrichton committed Mar 1, 2017
1 parent b047f33 commit 5fba67f
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/bin/tokio-proto-pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,11 @@ impl Service for Doubler {
type Request = u64;
type Response = u64;
type Error = io::Error;
type Future = BoxFuture<u64, io::Error>;
type Future = future::FutureResult<u64, io::Error>;

fn call(&self, req: u64) -> Self::Future {
// Just return the request, doubled
future::finished(req * 2).boxed()
future::ok(req * 2)
}
}

Expand Down
71 changes: 58 additions & 13 deletions src/bin/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ extern crate tarpc_bench;
extern crate futures;
extern crate tokio_core;

use futures::{Future, Stream};
use std::io::{self, Read, Write};
use std::str;

use futures::{Future, Stream, Poll, Async};
use tokio_core::io::{copy, Io, write_all, read_exact};
use tokio_core::net::{TcpListener, TcpStream};
use tokio_core::reactor::Core;
Expand All @@ -22,10 +25,15 @@ fn main() {
// server just copies
let server = sock.incoming()
.for_each(move |(sock, _)| {
//sock.set_nodelay(true).unwrap();
let (reader, writer) = sock.split();
let echo = copy(reader, writer).then(move |_| Ok(()));
handle.spawn(echo);
handle.spawn(MyServer {
socket: sock,
buf: String::new(),
output: Vec::new(),
});
// //sock.set_nodelay(true).unwrap();
// let (reader, writer) = sock.split();
// let echo = copy(reader, writer).then(move |_| Ok(()));
// handle.spawn(echo);
Ok(())
})
.map_err(|_| ());
Expand All @@ -35,21 +43,58 @@ fn main() {
let client = TcpStream::connect(&addr, &core.handle());
let client = core.run(client).unwrap();
//client.set_nodelay(true).unwrap();
let (r, w) = client.split();
let mut r = Some(r);
let mut w = Some(w);
let (mut r, mut w) = client.split();

let start = time::Instant::now();
let mut buf = Vec::from("foobar".as_bytes());
let mut buf = [0; 2];
for _ in 0..n {
let w_ = w.take().unwrap();
w = Some(core.run(write_all(w_, &buf)).unwrap().0);
let r_ = r.take().unwrap();
r = Some(core.run(read_exact(r_, &mut buf)).unwrap().0);
w = core.run(write_all(w, b"1\n")).unwrap().0;
r = core.run(read_exact(r, &mut buf)).unwrap().0;
assert!(buf[1] == b'\n');
assert_eq!(str::from_utf8(&buf[..1]).unwrap().parse::<u64>().unwrap(), 2u64);
}
drop(r);
drop(w);

println!("tokio {:.0}µs/call",
dur_to_ns!(start.elapsed()) as f64 / n as f64 / 1000.0);
}

struct MyServer {
socket: TcpStream,
buf: String,
output: Vec<u8>,
}

impl Future for MyServer {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
loop {
while self.output.len() > 0 {
match self.socket.write(&self.output) {
Ok(n) => { self.output.drain(..n); }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) => panic!("write error: {}", e),
}
}
if self.buf.find("\n").is_none() {
match self.socket.read_to_string(&mut self.buf) {
Ok(0) if self.buf.len() == 0 => return Ok(().into()),
Ok(_) => {}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
Err(e) => panic!("error: {}", e),
}
}
let i = match self.buf.find("\n") {
Some(i) => i,
None => return Ok(Async::NotReady),
};

let integer: u64 = self.buf[..i].parse().unwrap();
self.buf.drain(..i+1);
writeln!(self.output, "{}", i * 2);
}
}
}

0 comments on commit 5fba67f

Please sign in to comment.