From efe15e38dffdd4e83db53ff88afd3d59fb65ec54 Mon Sep 17 00:00:00 2001 From: Simon Farnsworth Date: Thu, 29 Sep 2022 14:24:17 +0100 Subject: [PATCH 1/7] benchmarks for copying Before the changes to read more often: test copy_chunk_to_mem ... bench: 128,141,778 ns/iter (+/- 410,047) test copy_chunk_to_slow_hdd ... bench: 161,070,661 ns/iter (+/- 4,341,313) test copy_mem_to_mem ... bench: 13,400 ns/iter (+/- 346) test copy_mem_to_slow_hdd ... bench: 181,176,810 ns/iter (+/- 222,276) --- benches/Cargo.toml | 7 ++ benches/copy.rs | 239 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 246 insertions(+) create mode 100644 benches/copy.rs diff --git a/benches/Cargo.toml b/benches/Cargo.toml index 2b98cfd3934..78e80fb1d3c 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -7,6 +7,8 @@ edition = "2018" [dependencies] tokio = { version = "1.5.0", path = "../tokio", features = ["full"] } bencher = "0.1.5" +rand = "0.8" +rand_chacha = "0.3" [dev-dependencies] tokio-util = { version = "0.7.0", path = "../tokio-util", features = ["full"] } @@ -50,3 +52,8 @@ harness = false name = "fs" path = "fs.rs" harness = false + +[[bench]] +name = "copy" +path = "copy.rs" +harness = false diff --git a/benches/copy.rs b/benches/copy.rs new file mode 100644 index 00000000000..6778be49d96 --- /dev/null +++ b/benches/copy.rs @@ -0,0 +1,239 @@ +use bencher::{benchmark_group, benchmark_main, Bencher}; + +use rand::{Rng, SeedableRng}; +use rand_chacha::ChaCha20Rng; + +use tokio::io::{copy, repeat, AsyncRead, AsyncReadExt, AsyncWrite}; +use tokio::time::{interval, Interval, MissedTickBehavior}; + +use std::task::Poll; +use std::time::Duration; + +const KILO: usize = 1024; + +// Tunable parameters if you want to change this benchmark. If reader and writer +// are matched in kilobytes per second, then this only exposes buffering to the +// benchmark. +const RNG_SEED: u64 = 0; +// How much data to copy in a single benchmark run +const SOURCE_SIZE: u64 = 256 * KILO as u64; +// Read side provides CHUNK_SIZE every READ_SERVICE_PERIOD. If it's not called +// frequently, it'll burst to catch up (representing OS buffers draining) +const CHUNK_SIZE: usize = 2 * KILO; +const READ_SERVICE_PERIOD: Duration = Duration::from_millis(1); +// Write side buffers up to WRITE_BUFFER, and flushes to disk every +// WRITE_SERVICE_PERIOD. +const WRITE_BUFFER: usize = 40 * KILO; +const WRITE_SERVICE_PERIOD: Duration = Duration::from_millis(20); +// How likely you are to have to wait for previously written data to be flushed +// because another writer claimed the buffer space +const PROBABILITY_FLUSH_WAIT: f64 = 0.1; + +/// A slow writer that aims to simulate HDD behaviour under heavy load. +/// +/// There is a limited buffer, which is fully drained on the next write after +/// a time limit is reached. Flush waits for the time limit to be reached +/// and then drains the buffer. +/// +/// At random, the HDD will stall writers while it flushes out all buffers. If +/// this happens to you, you will be unable to write until the next time the +/// buffer is drained. +struct SlowHddWriter { + service_intervals: Interval, + blocking_rng: ChaCha20Rng, + buffer_size: usize, + buffer_used: usize, +} + +impl SlowHddWriter { + fn new(service_interval: Duration, buffer_size: usize) -> Self { + let blocking_rng = ChaCha20Rng::seed_from_u64(RNG_SEED); + let mut service_intervals = interval(service_interval); + service_intervals.set_missed_tick_behavior(MissedTickBehavior::Delay); + Self { + service_intervals, + blocking_rng, + buffer_size, + buffer_used: 0, + } + } + + fn service_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // If we hit a service interval, the buffer can be cleared + let res = self.service_intervals.poll_tick(cx).map(|_| Ok(())); + if let Poll::Ready(_) = res { + self.buffer_used = 0; + } + res + } + + fn write_bytes( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + writeable: usize, + ) -> std::task::Poll> { + let service_res = self.as_mut().service_write(cx); + + if service_res.is_pending() && self.blocking_rng.gen_bool(PROBABILITY_FLUSH_WAIT) { + return Poll::Pending; + } + let available = self.buffer_size - self.buffer_used; + + if available == 0 { + assert!(service_res.is_pending()); + Poll::Pending + } else { + let written = available.min(writeable); + self.buffer_used += written; + Poll::Ready(Ok(written)) + } + } +} + +impl Unpin for SlowHddWriter {} + +impl AsyncWrite for SlowHddWriter { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + self.write_bytes(cx, buf.len()) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.service_write(cx) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.service_write(cx) + } + + fn poll_write_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> std::task::Poll> { + let writeable = bufs.into_iter().fold(0, |acc, buf| acc + buf.len()); + self.write_bytes(cx, writeable) + } + + fn is_write_vectored(&self) -> bool { + true + } +} + +/// A reader that limits the maximum chunk it'll give you back +/// +/// Simulates something reading from a slow link - you get one chunk per call, +/// and you are offered chunks on a schedule +struct ChunkReader { + data: Vec, + service_intervals: Interval, +} + +impl ChunkReader { + fn new(chunk_size: usize, service_interval: Duration) -> Self { + let mut service_intervals = interval(service_interval); + service_intervals.set_missed_tick_behavior(MissedTickBehavior::Burst); + let data: Vec = std::iter::repeat(0).take(chunk_size).collect(); + Self { + data, + service_intervals, + } + } +} + +impl AsyncRead for ChunkReader { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + if self.service_intervals.poll_tick(cx).is_pending() { + return Poll::Pending; + } + buf.put_slice(&self.data[..buf.remaining().min(self.data.len())]); + Poll::Ready(Ok(())) + } +} + +fn rt() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_time() + .build() + .unwrap() +} + +fn copy_mem_to_mem(b: &mut Bencher) { + let rt = rt(); + + b.iter(|| { + let task = || async { + let mut source = repeat(0).take(SOURCE_SIZE); + let mut dest = Vec::new(); + copy(&mut source, &mut dest).await.unwrap(); + }; + + rt.block_on(task()); + }) +} + +fn copy_mem_to_slow_hdd(b: &mut Bencher) { + let rt = rt(); + + b.iter(|| { + let task = || async { + let mut source = repeat(0).take(SOURCE_SIZE); + let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER); + copy(&mut source, &mut dest).await.unwrap(); + }; + + rt.block_on(task()); + }) +} + +fn copy_chunk_to_mem(b: &mut Bencher) { + let rt = rt(); + b.iter(|| { + let task = || async { + let mut source = ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE); + let mut dest = Vec::new(); + copy(&mut source, &mut dest).await.unwrap(); + }; + + rt.block_on(task()); + }) +} + +fn copy_chunk_to_slow_hdd(b: &mut Bencher) { + let rt = rt(); + b.iter(|| { + let task = || async { + let mut source = ChunkReader::new(CHUNK_SIZE, READ_SERVICE_PERIOD).take(SOURCE_SIZE); + let mut dest = SlowHddWriter::new(WRITE_SERVICE_PERIOD, WRITE_BUFFER); + copy(&mut source, &mut dest).await.unwrap(); + }; + + rt.block_on(task()); + }) +} + +benchmark_group!( + copy_bench, + copy_mem_to_mem, + copy_mem_to_slow_hdd, + copy_chunk_to_mem, + copy_chunk_to_slow_hdd, +); +benchmark_main!(copy_bench); From 217d6e2b26bbe16e4a49857e0632d5638b7075fb Mon Sep 17 00:00:00 2001 From: Simon Farnsworth Date: Wed, 28 Sep 2022 10:09:11 +0100 Subject: [PATCH 2/7] make `copy` read whenever it can't write until the buffer is full After this: test copy_chunk_to_mem ... bench: 128,117,779 ns/iter (+/- 443,280) test copy_chunk_to_slow_hdd ... bench: 141,173,126 ns/iter (+/- 348,824) test copy_mem_to_mem ... bench: 13,853 ns/iter (+/- 291) test copy_mem_to_slow_hdd ... bench: 181,171,138 ns/iter (+/- 343,375) --- tokio/src/io/util/copy.rs | 55 ++++++++++++++++++++++++++++++++------- 1 file changed, 45 insertions(+), 10 deletions(-) diff --git a/tokio/src/io/util/copy.rs b/tokio/src/io/util/copy.rs index d0ab7cb1409..d21e9610ebe 100644 --- a/tokio/src/io/util/copy.rs +++ b/tokio/src/io/util/copy.rs @@ -27,6 +27,46 @@ impl CopyBuffer { } } + fn poll_fill_buf( + &mut self, + cx: &mut Context<'_>, + reader: Pin<&mut R>, + ) -> Poll> + where + R: AsyncRead + ?Sized, + { + let me = &mut *self; + let mut buf = ReadBuf::new(&mut me.buf[me.cap..]); + + let res = reader.poll_read(cx, &mut buf); + me.cap += buf.filled().len(); + res + } + + fn poll_write_buf( + &mut self, + cx: &mut Context<'_>, + mut reader: Pin<&mut R>, + mut writer: Pin<&mut W>, + ) -> Poll> + where + R: AsyncRead + ?Sized, + W: AsyncWrite + ?Sized, + { + let me = &mut *self; + match writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]) { + Poll::Pending => { + // Top up the buffer towards full if we can read a bit more + // data - this should improve the chances of a large write + if me.cap != me.buf.len() { + ready!(me.poll_fill_buf(cx, reader.as_mut()))?; + } + Poll::Pending + } + res => return res, + } + } + pub(super) fn poll_copy( &mut self, cx: &mut Context<'_>, @@ -41,10 +81,10 @@ impl CopyBuffer { // If our buffer is empty, then we need to read some data to // continue. if self.pos == self.cap && !self.read_done { - let me = &mut *self; - let mut buf = ReadBuf::new(&mut me.buf); + self.pos = 0; + self.cap = 0; - match reader.as_mut().poll_read(cx, &mut buf) { + match self.poll_fill_buf(cx, reader.as_mut()) { Poll::Ready(Ok(_)) => (), Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), Poll::Pending => { @@ -59,19 +99,14 @@ impl CopyBuffer { } } - let n = buf.filled().len(); - if n == 0 { + if self.cap == 0 { self.read_done = true; - } else { - self.pos = 0; - self.cap = n; } } // If our buffer has some data, let's write it out! while self.pos < self.cap { - let me = &mut *self; - let i = ready!(writer.as_mut().poll_write(cx, &me.buf[me.pos..me.cap]))?; + let i = ready!(self.poll_write_buf(cx, reader.as_mut(), writer.as_mut()))?; if i == 0 { return Poll::Ready(Err(io::Error::new( io::ErrorKind::WriteZero, From 79c78e573a7bd9b0a95b36a2849ebe445eb14323 Mon Sep 17 00:00:00 2001 From: Simon Farnsworth Date: Sat, 1 Oct 2022 19:58:17 +0100 Subject: [PATCH 3/7] clippy --- tokio/src/io/util/copy.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/io/util/copy.rs b/tokio/src/io/util/copy.rs index d21e9610ebe..73816fdd1ea 100644 --- a/tokio/src/io/util/copy.rs +++ b/tokio/src/io/util/copy.rs @@ -63,7 +63,7 @@ impl CopyBuffer { } Poll::Pending } - res => return res, + res => res, } } From 44adc6293cb3d9b7a17e47c7e629a5b19baab95f Mon Sep 17 00:00:00 2001 From: Simon Farnsworth Date: Sat, 1 Oct 2022 20:01:45 +0100 Subject: [PATCH 4/7] Don't continue reading after read done, even in the write side --- tokio/src/io/util/copy.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/io/util/copy.rs b/tokio/src/io/util/copy.rs index 73816fdd1ea..9bef87818d5 100644 --- a/tokio/src/io/util/copy.rs +++ b/tokio/src/io/util/copy.rs @@ -58,7 +58,7 @@ impl CopyBuffer { Poll::Pending => { // Top up the buffer towards full if we can read a bit more // data - this should improve the chances of a large write - if me.cap != me.buf.len() { + if !me.read_done && me.cap != me.buf.len() { ready!(me.poll_fill_buf(cx, reader.as_mut()))?; } Poll::Pending From d1ac1688d81c7c4c1bb06576e6f8700544398c69 Mon Sep 17 00:00:00 2001 From: Simon Farnsworth Date: Mon, 3 Oct 2022 09:34:01 +0100 Subject: [PATCH 5/7] Take @Darksonn's review suggestions --- benches/copy.rs | 3 +-- tokio/src/io/util/copy.rs | 13 ++++++------- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/benches/copy.rs b/benches/copy.rs index 6778be49d96..ae02ad5a7d9 100644 --- a/benches/copy.rs +++ b/benches/copy.rs @@ -168,8 +168,7 @@ impl AsyncRead for ChunkReader { } fn rt() -> tokio::runtime::Runtime { - tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) + tokio::runtime::Builder::new_current_thread() .enable_time() .build() .unwrap() diff --git a/tokio/src/io/util/copy.rs b/tokio/src/io/util/copy.rs index 9bef87818d5..f0215443b4d 100644 --- a/tokio/src/io/util/copy.rs +++ b/tokio/src/io/util/copy.rs @@ -36,10 +36,13 @@ impl CopyBuffer { R: AsyncRead + ?Sized, { let me = &mut *self; - let mut buf = ReadBuf::new(&mut me.buf[me.cap..]); + let mut buf = ReadBuf::new(&mut me.buf); + buf.set_filled(me.cap); let res = reader.poll_read(cx, &mut buf); - me.cap += buf.filled().len(); + let filled_len = buf.filled().len(); + me.read_done = me.cap == filled_len; + me.cap = filled_len; res } @@ -58,7 +61,7 @@ impl CopyBuffer { Poll::Pending => { // Top up the buffer towards full if we can read a bit more // data - this should improve the chances of a large write - if !me.read_done && me.cap != me.buf.len() { + if !me.read_done && me.cap < me.buf.len() { ready!(me.poll_fill_buf(cx, reader.as_mut()))?; } Poll::Pending @@ -98,10 +101,6 @@ impl CopyBuffer { return Poll::Pending; } } - - if self.cap == 0 { - self.read_done = true; - } } // If our buffer has some data, let's write it out! From dadb7c22acc37981e5f4b6f172dfa90f49acc2b9 Mon Sep 17 00:00:00 2001 From: Simon Farnsworth Date: Mon, 3 Oct 2022 10:30:21 +0100 Subject: [PATCH 6/7] oops - if the read doesn't claim success, the buffer shouldn't change --- tokio/src/io/util/copy.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tokio/src/io/util/copy.rs b/tokio/src/io/util/copy.rs index f0215443b4d..f8801072fdd 100644 --- a/tokio/src/io/util/copy.rs +++ b/tokio/src/io/util/copy.rs @@ -40,9 +40,11 @@ impl CopyBuffer { buf.set_filled(me.cap); let res = reader.poll_read(cx, &mut buf); - let filled_len = buf.filled().len(); - me.read_done = me.cap == filled_len; - me.cap = filled_len; + if let Poll::Ready(_) = res { + let filled_len = buf.filled().len(); + me.read_done = me.cap == filled_len; + me.cap = filled_len; + } res } From 4299c850a7db7af2cae81d09eda160073b31a615 Mon Sep 17 00:00:00 2001 From: Simon Farnsworth Date: Mon, 3 Oct 2022 10:33:27 +0100 Subject: [PATCH 7/7] check success, not progress --- tokio/src/io/util/copy.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/io/util/copy.rs b/tokio/src/io/util/copy.rs index f8801072fdd..47dad89c76c 100644 --- a/tokio/src/io/util/copy.rs +++ b/tokio/src/io/util/copy.rs @@ -40,7 +40,7 @@ impl CopyBuffer { buf.set_filled(me.cap); let res = reader.poll_read(cx, &mut buf); - if let Poll::Ready(_) = res { + if let Poll::Ready(Ok(_)) = res { let filled_len = buf.filled().len(); me.read_done = me.cap == filled_len; me.cap = filled_len;