diff --git a/glommio/src/io/buffered_file_stream.rs b/glommio/src/io/buffered_file_stream.rs index a461526a9..3ee25f50d 100644 --- a/glommio/src/io/buffered_file_stream.rs +++ b/glommio/src/io/buffered_file_stream.rs @@ -391,7 +391,7 @@ impl StreamWriter { bytes, self.file_pos, ); - source.add_waiter(waker); + source.add_waiter_single(waker); self.source = Some(source); true } else { @@ -410,7 +410,7 @@ impl StreamWriter { .upgrade() .unwrap() .fdatasync(self.file.as_ref().unwrap().as_raw_fd()); - source.add_waiter(cx.waker().clone()); + source.add_waiter_single(cx.waker().clone()); self.source = Some(source); Poll::Pending } @@ -430,7 +430,7 @@ impl StreamWriter { .upgrade() .unwrap() .close(self.file.as_ref().unwrap().as_raw_fd()); - source.add_waiter(cx.waker().clone()); + source.add_waiter_single(cx.waker().clone()); self.source = Some(source); Poll::Pending } @@ -498,7 +498,7 @@ macro_rules! do_seek { .upgrade() .unwrap() .statx($fileobj.as_raw_fd(), &$fileobj.path().unwrap()); - source.add_waiter($cx.waker().clone()); + source.add_waiter_single($cx.waker().clone()); $source = Some(source); Poll::Pending } @@ -586,7 +586,7 @@ impl AsyncBufRead for StreamReader { self.buffer.max_buffer_size, self.file.file.scheduler.borrow().as_ref(), ); - source.add_waiter(cx.waker().clone()); + source.add_waiter_single(cx.waker().clone()); self.io_source = Some(source); Poll::Pending } @@ -671,7 +671,7 @@ impl AsyncBufRead for Stdin { self.buffer.max_buffer_size, None, ); - source.add_waiter(cx.waker().clone()); + source.add_waiter_single(cx.waker().clone()); self.source = Some(source); Poll::Pending } diff --git a/glommio/src/io/bulk_io.rs b/glommio/src/io/bulk_io.rs index 9d3f7a6d1..b01abfe94 100644 --- a/glommio/src/io/bulk_io.rs +++ b/glommio/src/io/bulk_io.rs @@ -41,7 +41,7 @@ impl Stream for OrderedBulkIo { Poll::Pending }; if let Some((Some(source), _)) = self.iovs.front_mut() { - source.add_waiter(cx.waker().clone()); + source.add_waiter_many(cx.waker().clone()); } res } diff --git a/glommio/src/net/stream.rs b/glommio/src/net/stream.rs index 1829af363..4ad474d47 100644 --- a/glommio/src/net/stream.rs +++ b/glommio/src/net/stream.rs @@ -226,7 +226,7 @@ impl> GlommioStream { }; if source.result().is_none() { - source.add_waiter(cx.waker().clone()); + source.add_waiter_single(cx.waker().clone()); self.source_rx = Some(source); Poll::Pending } else { @@ -253,7 +253,7 @@ impl> GlommioStream { match source.result() { None => { - source.add_waiter(cx.waker().clone()); + source.add_waiter_single(cx.waker().clone()); self.source_tx = Some(source); Poll::Pending } diff --git a/glommio/src/sys/source.rs b/glommio/src/sys/source.rs index e8ddf65e8..2b4295b15 100644 --- a/glommio/src/sys/source.rs +++ b/glommio/src/sys/source.rs @@ -216,7 +216,19 @@ impl Source { .map(|x| OsResult::from(x).into()) } - pub(crate) fn add_waiter(&self, waker: Waker) { + // adds a single waiter to the list, replacing any waiter that may already + // exist. Should be used for single streams that map a future 1:1 to their I/O + // source + pub(crate) fn add_waiter_single(&self, waker: Waker) { + let mut inner = self.inner.borrow_mut(); + inner.wakers.waiters.pop(); + inner.wakers.waiters.push(waker); + debug_assert_eq!(inner.wakers.waiters.len(), 1) + } + + // adds a waiter to the list. Useful for streams that have many futures waiting + // on a single I/O source + pub(crate) fn add_waiter_many(&self, waker: Waker) { self.inner.borrow_mut().wakers.waiters.push(waker) } @@ -234,7 +246,7 @@ impl Source { return Poll::Ready(result); } - self.add_waiter(cx.waker().clone()); + self.add_waiter_many(cx.waker().clone()); Poll::Pending }) .await