Skip to content

Commit

Permalink
make a stronger distinction between add_waiter use cases
Browse files Browse the repository at this point in the history
It used to be the case where we would assume there was a single waiter
per source, but in the read_many work we moved that to many-waiters.

The function that manipulates the wakers was kept the same, so whether
or not we had many wakers was entirely up to the user.

However I just found a use case (issue DataDog#391) where there are many
waiters added to a source that only expects a single waiter. That's
a use case in which we reuse a source, and we keep calling the poll
function into a stream even though the stream is never ready.

It's not clear to me (yet) why this is the case. It is certainly
surprising. While I want to get to the bottom of this, it is not a bad
idea to require the user to ask for their intentions explicitly.

In the future, if we can indeed guarantee that a function with a single
waiter should be empty we can use this opportunity for a stronger
assert. For now, this reverts the old behavior in the original users
and at least gets rid of this particular regression

Fixes DataDog#391
  • Loading branch information
Glauber Costa committed Aug 4, 2021
1 parent c781af1 commit 503becb
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 11 deletions.
12 changes: 6 additions & 6 deletions glommio/src/io/buffered_file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ impl StreamWriter {
bytes,
self.file_pos,
);
source.add_waiter(waker);
source.add_waiter_single(waker);
self.source = Some(source);
true
} else {
Expand All @@ -410,7 +410,7 @@ impl StreamWriter {
.upgrade()
.unwrap()
.fdatasync(self.file.as_ref().unwrap().as_raw_fd());
source.add_waiter(cx.waker().clone());
source.add_waiter_single(cx.waker().clone());
self.source = Some(source);
Poll::Pending
}
Expand All @@ -430,7 +430,7 @@ impl StreamWriter {
.upgrade()
.unwrap()
.close(self.file.as_ref().unwrap().as_raw_fd());
source.add_waiter(cx.waker().clone());
source.add_waiter_single(cx.waker().clone());
self.source = Some(source);
Poll::Pending
}
Expand Down Expand Up @@ -498,7 +498,7 @@ macro_rules! do_seek {
.upgrade()
.unwrap()
.statx($fileobj.as_raw_fd(), &$fileobj.path().unwrap());
source.add_waiter($cx.waker().clone());
source.add_waiter_single($cx.waker().clone());
$source = Some(source);
Poll::Pending
}
Expand Down Expand Up @@ -586,7 +586,7 @@ impl AsyncBufRead for StreamReader {
self.buffer.max_buffer_size,
self.file.file.scheduler.borrow().as_ref(),
);
source.add_waiter(cx.waker().clone());
source.add_waiter_single(cx.waker().clone());
self.io_source = Some(source);
Poll::Pending
}
Expand Down Expand Up @@ -671,7 +671,7 @@ impl AsyncBufRead for Stdin {
self.buffer.max_buffer_size,
None,
);
source.add_waiter(cx.waker().clone());
source.add_waiter_single(cx.waker().clone());
self.source = Some(source);
Poll::Pending
}
Expand Down
2 changes: 1 addition & 1 deletion glommio/src/io/bulk_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl<U: Copy + Unpin> Stream for OrderedBulkIo<U> {
Poll::Pending
};
if let Some((Some(source), _)) = self.iovs.front_mut() {
source.add_waiter(cx.waker().clone());
source.add_waiter_many(cx.waker().clone());
}
res
}
Expand Down
4 changes: 2 additions & 2 deletions glommio/src/net/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl<S: FromRawFd + AsRawFd + From<socket2::Socket>> GlommioStream<S> {
};

if source.result().is_none() {
source.add_waiter(cx.waker().clone());
source.add_waiter_single(cx.waker().clone());
self.source_rx = Some(source);
Poll::Pending
} else {
Expand All @@ -253,7 +253,7 @@ impl<S: FromRawFd + AsRawFd + From<socket2::Socket>> GlommioStream<S> {

match source.result() {
None => {
source.add_waiter(cx.waker().clone());
source.add_waiter_single(cx.waker().clone());
self.source_tx = Some(source);
Poll::Pending
}
Expand Down
16 changes: 14 additions & 2 deletions glommio/src/sys/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,19 @@ impl Source {
.map(|x| OsResult::from(x).into())
}

pub(crate) fn add_waiter(&self, waker: Waker) {
// adds a single waiter to the list, replacing any waiter that may already
// exist. Should be used for single streams that map a future 1:1 to their I/O
// source
pub(crate) fn add_waiter_single(&self, waker: Waker) {
let mut inner = self.inner.borrow_mut();
inner.wakers.waiters.pop();
inner.wakers.waiters.push(waker);
debug_assert_eq!(inner.wakers.waiters.len(), 1)
}

// adds a waiter to the list. Useful for streams that have many futures waiting
// on a single I/O source
pub(crate) fn add_waiter_many(&self, waker: Waker) {
self.inner.borrow_mut().wakers.waiters.push(waker)
}

Expand All @@ -234,7 +246,7 @@ impl Source {
return Poll::Ready(result);
}

self.add_waiter(cx.waker().clone());
self.add_waiter_many(cx.waker().clone());
Poll::Pending
})
.await
Expand Down

0 comments on commit 503becb

Please sign in to comment.