Skip to content

Commit

Permalink
fix(service): Fix Service::stop not working
Browse files Browse the repository at this point in the history
There were 2 issues in the stop system for services :
- On the first poll of the stream, the shutdown future was not polled,
  thus stopping the service would never wake this task
- After the shutdown future is resolved, the next poll will still poll
  the shutdown future, which is not allowed
  • Loading branch information
tinou98 committed Nov 12, 2023
1 parent b2740d8 commit f6599c5
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions async-nats/src/service/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ impl Stream for Endpoint {
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
trace!("polling for next request");
match self.shutdown_future.as_mut() {
Some(shutdown) => match shutdown.as_mut().poll(cx) {
if let Some(mut receiver) = self.shutdown.take() {
// Need to initialize `shutdown_future` on first poll
self.shutdown_future = Some(Box::pin(async move { receiver.recv().await }));
}

if let Some(shutdown) = self.shutdown_future.as_mut() {
match shutdown.as_mut().poll(cx) {
Poll::Ready(_result) => {
debug!("got stop broadcast");
self.requests
Expand All @@ -54,16 +59,16 @@ impl Stream for Endpoint {
max: None,
})
.ok();

// Clear future, can't be resumed after completion
self.shutdown_future = None;
}
Poll::Pending => {
trace!("stop broadcast still pending");
}
},
None => {
let mut receiver = self.shutdown.take().unwrap();
self.shutdown_future = Some(Box::pin(async move { receiver.recv().await }));
}
}

trace!("checking for new messages");
match self.requests.poll_next_unpin(cx) {
Poll::Ready(message) => {
Expand Down

0 comments on commit f6599c5

Please sign in to comment.