diff --git a/profile.sh b/profile.sh index e4ecdcd..eb0dda6 100755 --- a/profile.sh +++ b/profile.sh @@ -9,14 +9,16 @@ sudo date > /dev/null # benchmark echo "==> profile" sudo perf record -c 20000 --call-graph=dwarf -g -o tokio.perf.data target/release/tokio -sudo perf record -c 20000 --call-graph=dwarf -g -o tokio-proto.perf.data target/release/tokio-proto +sudo perf record -c 20000 --call-graph=dwarf -g -o tokio-proto-pipeline.perf.data target/release/tokio-proto-pipeline +sudo perf record -c 20000 --call-graph=dwarf -g -o tokio-proto-multiplex.perf.data target/release/tokio-proto-multiplex sudo perf record -c 20000 --call-graph=dwarf -g -o tarpc.perf.data target/release/tarpc # make pretty flamegraphs echo "==> make flamegraphs" sudo perf script -i tokio.perf.data | stackcollapse-perf | ./unmangle.sh | flamegraph > tokio.svg -sudo perf script -i tokio-proto.perf.data | stackcollapse-perf | ./unmangle.sh | flamegraph > tokio-proto.svg +sudo perf script -i tokio-proto-pipeline.perf.data | stackcollapse-perf | ./unmangle.sh | flamegraph > tokio-proto-pipeline.svg +sudo perf script -i tokio-proto-multiplex.perf.data | stackcollapse-perf | ./unmangle.sh | flamegraph > tokio-proto-multiplex.svg sudo perf script -i tarpc.perf.data | stackcollapse-perf | ./unmangle.sh | flamegraph > tarpc.svg -echo "flamegraph: open [tokio|tokio-proto|tarpc].svg" -echo "perf report: sudo perf report -g --no-children -i [tokio|tokio-proto|tarpc].perf.data" +echo "flamegraph: open [tokio|tokio-proto-(multiplex|pipeline)|tarpc].svg" +echo "perf report: sudo perf report -g --no-children -i [tokio|tokio-proto-(multiplex|pipeline)|tarpc].perf.data" diff --git a/src/bin/tokio-proto.rs b/src/bin/tokio-proto-multiplex.rs similarity index 97% rename from src/bin/tokio-proto.rs rename to src/bin/tokio-proto-multiplex.rs index 140ee48..7cd2eba 100644 --- a/src/bin/tokio-proto.rs +++ b/src/bin/tokio-proto-multiplex.rs @@ -40,7 +40,9 @@ impl Codec for IntCodec { // message is available; returns `Ok(None)` if the buffer does not yet // hold a complete message. fn decode(&mut self, buf: &mut EasyBuf) -> io::Result> { - if buf.len() < 2 * mem::size_of::() { return Ok(None); } + if buf.len() < 2 * mem::size_of::() { + return Ok(None); + } let mut id_buf = buf.drain_to(mem::size_of::()); let id = Cursor::new(&mut id_buf).read_u64::()?; let mut item_buf = buf.drain_to(mem::size_of::()); @@ -148,6 +150,6 @@ fn main() { core.run(client.call(1)).unwrap(); } - println!("tokio-proto {:.0}µs/call", + println!("tokio-proto-multiplex {:.0}µs/call", dur_to_ns!(start.elapsed()) as f64 / n as f64 / 1000.0); } diff --git a/src/bin/tokio-proto-pipeline.rs b/src/bin/tokio-proto-pipeline.rs new file mode 100644 index 0000000..21daa5a --- /dev/null +++ b/src/bin/tokio-proto-pipeline.rs @@ -0,0 +1,154 @@ +#[macro_use] +extern crate tarpc_bench; + +extern crate futures; +extern crate tokio_core; +extern crate tokio_proto; +extern crate tokio_service; +extern crate net2; + +use std::str; +use std::io::{self, ErrorKind, Write}; + +use futures::{future, Future, BoxFuture}; +use tokio_core::io::{Io, Codec, Framed, EasyBuf}; +use tokio_core::net::TcpListener; +use tokio_proto::pipeline::{ServerProto, ClientProto}; +use tokio_proto::TcpClient; +use tokio_service::Service; +use tokio_core::reactor::Core; +use futures::stream::Stream; +use tokio_proto::BindServer; +use std::net::SocketAddr; + +// First, we implement a *codec*, which provides a way of encoding and +// decoding messages for the protocol. See the documentation for `Codec` in +// `tokio-core` for more details on how that works. + +#[derive(Default)] +pub struct IntCodec; + +fn parse_u64(from: &[u8]) -> Result { + Ok(str::from_utf8(from).map_err(|e| io::Error::new(ErrorKind::InvalidData, e))? + .parse() + .map_err(|e| io::Error::new(ErrorKind::InvalidData, e))?) +} + +impl Codec for IntCodec { + type In = u64; + type Out = u64; + + // Attempt to decode a message from the given buffer if a complete + // message is available; returns `Ok(None)` if the buffer does not yet + // hold a complete message. + fn decode(&mut self, buf: &mut EasyBuf) -> Result, io::Error> { + if let Some(i) = buf.as_slice().iter().position(|&b| b == b'\n') { + // remove the line, including the '\n', from the buffer + let full_line = buf.drain_to(i + 1); + + // strip the'`\n' + let slice = &full_line.as_slice()[..i]; + + Ok(Some(parse_u64(slice)?)) + } else { + Ok(None) + } + } + + // Attempt to decode a message assuming that the given buffer contains + // *all* remaining input data. + fn decode_eof(&mut self, buf: &mut EasyBuf) -> io::Result { + let amt = buf.len(); + Ok(parse_u64(buf.drain_to(amt).as_slice())?) + } + + fn encode(&mut self, item: u64, into: &mut Vec) -> io::Result<()> { + writeln!(into, "{}", item).map(|_| ()) + } +} + +// Next, we implement the server protocol, which just hooks up the codec above. + +pub struct IntProto; + +impl ServerProto for IntProto { + type Request = u64; + type Response = u64; + type Transport = Framed; + type BindTransport = Result; + + fn bind_transport(&self, io: T) -> Self::BindTransport { + Ok(io.framed(IntCodec)) + } +} + +impl ClientProto for IntProto { + type Request = u64; + type Response = u64; + type Transport = Framed; + type BindTransport = Result; + + fn bind_transport(&self, io: T) -> Self::BindTransport { + Ok(io.framed(IntCodec)) + } +} + +// Now we implement a service we'd like to run on top of this protocol + +pub struct Doubler; + +impl Service for Doubler { + type Request = u64; + type Response = u64; + type Error = io::Error; + type Future = BoxFuture; + + fn call(&self, req: u64) -> Self::Future { + // Just return the request, doubled + future::finished(req * 2).boxed() + } +} + +// Finally, we can actually host this service locally! +fn main() { + use std::time; + let n = 100000; + + let addr = "127.0.0.1:12345".parse().unwrap(); + + let mut core = Core::new().unwrap(); + let handle = core.handle(); + + // start server + // thread::spawn(move || TcpServer::new(IntProto, addr).serve(|| Ok(Doubler))); + let listener = match addr { + SocketAddr::V4(_) => net2::TcpBuilder::new_v4().unwrap(), + SocketAddr::V6(_) => net2::TcpBuilder::new_v6().unwrap(), + }; + listener.reuse_address(true).unwrap(); + listener.bind(addr).unwrap(); + let server = listener.listen(1024) + .and_then(|l| TcpListener::from_listener(l, &addr, &handle)) + .unwrap() + .incoming() + .for_each(move |(socket, _)| { + IntProto.bind_server(&handle, socket, Doubler); + Ok(()) + }) + .map_err(|_| ()); + + core.handle().spawn(server); + + // connect with client + let handle = core.handle(); + let client = core.run(TcpClient::new(IntProto).connect(&addr, &handle)).unwrap(); + + // benchmark + let start = time::Instant::now(); + for _ in 0..n { + core.run(client.call(1)).unwrap(); + } + + println!("tokio-proto-pipeline {:.0}µs/call", + dur_to_ns!(start.elapsed()) as f64 / n as f64 / 1000.0); +}