Skip to content

Commit

Permalink
fix: Replace asserts with None checks for graceful shutdown (#1244)
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu authored Sep 9, 2024
1 parent 49c7c61 commit ced4f52
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1104,8 +1104,11 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
)

with self._pause_resume_lock:
assert self._scheduler is not None
assert self._leaser is not None
if self._scheduler is None or self._leaser is None:
_LOGGER.debug(
f"self._scheduler={self._scheduler} or self._leaser={self._leaser} is None. Stopping further processing."
)
return

for received_message in received_messages:
if (
Expand Down
56 changes: 56 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,62 @@ def test_close_blocking_scheduler_shutdown():
scheduler.shutdown.assert_called_once_with(await_msg_callbacks=True)


def test__on_response_none_scheduler():
manager, _, _, _, _, _ = make_running_manager()

manager._callback = mock.sentinel.callback
manager._scheduler = None
# Set up the messages.
response = gapic_types.StreamingPullResponse(
received_messages=[
gapic_types.ReceivedMessage(
ack_id="ack1",
message=gapic_types.PubsubMessage(data=b"foo", message_id="1"),
),
gapic_types.ReceivedMessage(
ack_id="ack2",
message=gapic_types.PubsubMessage(data=b"bar", message_id="2"),
delivery_attempt=6,
),
]
)

manager._maybe_release_messages = mock.Mock()

# adjust message bookkeeping in leaser
fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42)
manager._on_response(response)

manager._maybe_release_messages.assert_not_called


def test__on_response_none_leaser():
manager, _, _, _, _, _ = make_running_manager()

manager._callback = mock.sentinel.callback
manager._leaser = None
# Set up the messages.
response = gapic_types.StreamingPullResponse(
received_messages=[
gapic_types.ReceivedMessage(
ack_id="ack1",
message=gapic_types.PubsubMessage(data=b"foo", message_id="1"),
),
gapic_types.ReceivedMessage(
ack_id="ack2",
message=gapic_types.PubsubMessage(data=b"bar", message_id="2"),
delivery_attempt=6,
),
]
)

manager._maybe_release_messages = mock.Mock()

manager._on_response(response)

manager._maybe_release_messages.assert_not_called


def test_close_nonblocking_scheduler_shutdown():
manager, _, _, _, _, _ = make_running_manager(await_callbacks_on_shutdown=False)
scheduler = manager._scheduler
Expand Down

0 comments on commit ced4f52

Please sign in to comment.