diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 21c1bab7b..89dc93e74 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -989,7 +989,7 @@ def _get_initial_request( return request def _send_lease_modacks( - self, ack_ids: Iterable[str], ack_deadline: float + self, ack_ids: Iterable[str], ack_deadline: float, warn_on_invalid=True ) -> List[str]: exactly_once_enabled = False with self._exactly_once_enabled_lock: @@ -1010,10 +1010,14 @@ def _send_lease_modacks( assert req.future is not None req.future.result() except AcknowledgeError as ack_error: - _LOGGER.warning( - "AcknowledgeError when lease-modacking a message.", - exc_info=True, - ) + if ( + ack_error.error_code != AcknowledgeStatus.INVALID_ACK_ID + or warn_on_invalid + ): + _LOGGER.warning( + "AcknowledgeError when lease-modacking a message.", + exc_info=True, + ) if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID: expired_ack_ids.append(req.ack_id) return expired_ack_ids @@ -1078,7 +1082,11 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: # modack the messages we received, as this tells the server that we've # received them. ack_id_gen = (message.ack_id for message in received_messages) - expired_ack_ids = set(self._send_lease_modacks(ack_id_gen, self.ack_deadline)) + expired_ack_ids = set( + self._send_lease_modacks( + ack_id_gen, self.ack_deadline, warn_on_invalid=False + ) + ) with self._pause_resume_lock: assert self._scheduler is not None diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index b4f76f20b..1f28b3f40 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -1846,7 +1846,7 @@ def test__on_response_disable_exactly_once(): assert manager._stream_ack_deadline == 60 -def test__on_response_exactly_once_immediate_modacks_fail(): +def test__on_response_exactly_once_immediate_modacks_fail(caplog): manager, _, dispatcher, leaser, _, scheduler = make_running_manager() manager._callback = mock.sentinel.callback @@ -1890,7 +1890,8 @@ def complete_futures_with_error(*args, **kwargs): fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=10) - manager._on_response(response) + with caplog.at_level(logging.WARNING): + manager._on_response(response) # The second messages should be scheduled, and not the first. @@ -1902,6 +1903,14 @@ def complete_futures_with_error(*args, **kwargs): assert call_args[1].message_id == "2" assert manager._messages_on_hold.size == 0 + + expected_warnings = [ + record.message.lower() + for record in caplog.records + if "AcknowledgeError when lease-modacking a message." in record.message + ] + assert len(expected_warnings) == 1 + # No messages available assert manager._messages_on_hold.get() is None @@ -1909,6 +1918,78 @@ def complete_futures_with_error(*args, **kwargs): assert manager.load == 0.001 +def test__on_response_exactly_once_immediate_modacks_fail_non_invalid(caplog): + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() + manager._callback = mock.sentinel.callback + + def complete_futures_with_error(*args, **kwargs): + modack_requests = args[0] + for req in modack_requests: + if req.ack_id == "fack": + req.future.set_exception( + subscriber_exceptions.AcknowledgeError( + subscriber_exceptions.AcknowledgeStatus.OTHER, None + ) + ) + else: + req.future.set_exception( + subscriber_exceptions.AcknowledgeError( + subscriber_exceptions.AcknowledgeStatus.SUCCESS, None + ) + ) + + dispatcher.modify_ack_deadline.side_effect = complete_futures_with_error + + # Set up the messages. + response = gapic_types.StreamingPullResponse( + received_messages=[ + gapic_types.ReceivedMessage( + ack_id="fack", + message=gapic_types.PubsubMessage(data=b"foo", message_id="1"), + ), + gapic_types.ReceivedMessage( + ack_id="good", + message=gapic_types.PubsubMessage(data=b"foo", message_id="2"), + ), + ], + subscription_properties=gapic_types.StreamingPullResponse.SubscriptionProperties( + exactly_once_delivery_enabled=True + ), + ) + + # Actually run the method and prove that modack and schedule are called in + # the expected way. + + fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=10) + + with caplog.at_level(logging.WARNING): + manager._on_response(response) + + # The second messages should be scheduled, and not the first. + + schedule_calls = scheduler.schedule.mock_calls + assert len(schedule_calls) == 2 + call_args = schedule_calls[0][1] + assert call_args[0] == mock.sentinel.callback + assert isinstance(call_args[1], message.Message) + assert call_args[1].message_id == "1" + + assert manager._messages_on_hold.size == 0 + + expected_warnings = [ + record.message.lower() + for record in caplog.records + if "AcknowledgeError when lease-modacking a message." in record.message + ] + assert len(expected_warnings) == 2 + + # No messages available + assert manager._messages_on_hold.get() is None + + # do not add message + assert manager.load == 0.002 + + def test__should_recover_true(): manager = make_manager()