Skip to content

Commit

Permalink
Decrement the count of outstanding tasks before returning in the even…
Browse files Browse the repository at this point in the history
…t of an exception. (#406)
  • Loading branch information
ZHANG-EH authored Aug 27, 2024
1 parent c8084ad commit 62e5e5b
Showing 1 changed file with 2 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ async def _process_send(self, message_envelope: SendMessageEnvelope) -> None:
)
except BaseException as e:
message_envelope.future.set_exception(e)
self._outstanding_tasks.decrement()
return

self._message_queue.append(
Expand Down Expand Up @@ -327,6 +328,7 @@ async def _process_publish(self, message_envelope: PublishMessageEnvelope) -> No
except BaseException as e:
# Ignore cancelled errors from logs
if isinstance(e, CancelledError):
self._outstanding_tasks.decrement()
return
logger.error("Error processing publish message", exc_info=True)
finally:
Expand Down

0 comments on commit 62e5e5b

Please sign in to comment.